Why not something like:
lines.foreachRDD(rdd = {
*//Convert rdd(json) to map*
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val myMap = mapper.readValue[Map[String,String]](x)
val event =
Hi,
I am processing a bunch of HDFS data using the StreamingContext (Spark
1.1.0) which means that all files that exist in the directory at start()
time are processed in the first batch. Now when I try to stop this stream
processing using `streamingContext.stop(false, false)` (that is, even with
First of all any action is only performed when you trigger a collect,
When you trigger collect, at that point it retrieves data from disk joins
the datasets together delivers it to you.
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi
Hi
I am trying to run a basic twitter stream program but getting blank
output. Please correct me if I am missing something.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.twitter.TwitterUtils
import
Yes that is my understanding of how it should work.
But in my case when I call collect first time, it reads the data from files
on the disk.
Subsequent collect queries are not reading data files ( Verified from the
logs.)
On spark ui I see only shuffle read and no shuffle write.
--
View this
Change this line
*val* sparkConf = *new* SparkConf().setAppName(TwitterPopularTags
).setMaster(local).set(spark.eventLog.enabled,true)
to
*val* sparkConf = *new* SparkConf().setAppName(TwitterPopularTags
).setMaster(*local[4]*).set(spark.eventLog.enabled,true)
Thanks
Best Regards
On Thu,
Hi,
I have tried to run basic streaming example (
https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html)
I have established two ssh connections to the machine where spark is
installed. In one terminal, I have started netcat with command
nc -lk
In other terminal I have run
I suppose it means what it says, that you it can't connect, but that's
strange to be unable to connect to a port on localhost.
What if you telnet localhost and type some text? does it show
up in the nc output? if not, it's some other problem locally, like a
firewall, or nc not running, or
it was built with 1.6 (tried 1.7, too)
On Thu, Nov 13, 2014 at 2:52 AM, Andrew Or-2 [via Apache Spark User
List] ml-node+s1001560n18797...@n3.nabble.com wrote:
Hey Jamborta,
What java version did you build the jar with?
2014-11-12 16:48 GMT-08:00 jamborta [hidden email]:
I have figured out
Let us say I have the following two RDDs, with the following key-pair
values.
rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]
and
rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ]
Now, I want to join them by key values, so for example I want to return the
following
Try *nc -lp *
Thanks
Best Regards
On Thu, Nov 13, 2014 at 3:36 PM, Niko Gamulin niko.gamu...@gmail.com
wrote:
Hi,
I have tried to run basic streaming example (
https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html)
I have established two ssh connections to the machine
Check cogroup.
Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
http://in.linkedin.com/in/sonalgoyal
On Thu, Nov 13, 2014 at 5:11 PM, Blind Faith person.of.b...@gmail.com
wrote:
Let us say I have the following two RDDs, with the following key-pair
values.
rdd1 =
Hi
I am getting the following error while running the
TwitterPopularTags example .I am using spark-1.1.0-bin-hadoop2.4 .
jishnu@getafix:~/spark/bin$ run-example TwitterPopularTags *** ** ** *** **
spark assembly has been built with Hive, including Datanucleus jars on classpath
Hi
Thanks Akhil you saved the day…. Its working perfectly …
Regards
Jishnu Menath Prathap
From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Thursday, November 13, 2014 3:25 PM
To: Jishnu Menath Prathap (WT01 - BAS)
Cc: Akhil [via Apache Spark User List];
Hi,
I am receiving following error when I am trying to run sample spark program.
Caused by: java.lang.UnsatisfiedLinkError:
Thanks Michael.
I used Parquet files and it could able to solve my initial problem to some
extent (i.e. loading data from one context and reading it from another
context).
But there I could see another issue. I need to load the parquet file every
time I create the JavaSQLContext using
nc returns an error if you do that. nc -lk is correct.
On Thu, Nov 13, 2014 at 11:46 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
Try nc -lp
Thanks
Best Regards
-
To unsubscribe, e-mail:
I think he's on ubuntu/debain box
Thanks
Best Regards
On Thu, Nov 13, 2014 at 6:23 PM, Sean Owen so...@cloudera.com wrote:
nc returns an error if you do that. nc -lk is correct.
On Thu, Nov 13, 2014 at 11:46 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:
Try nc -lp
Thanks
Best
I encounter no issues with streaming from kafka to spark in 1.1.0. Do you
perhaps have a version conflict?
Helena
On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote:
Yup , very important that n1 for spark streaming jobs, If local use
local[2]
The thing to remember is
I'm trying to understand the disk I/O patterns for Spark -- specifically, I'd
like to reduce the number of files that are being written during shuffle
operations. A couple questions:
* is the amount of file I/O performed independent of the memory I allocate
for the shuffles?
* if this is the
Hi, I'm using Spark 1.1.0. There is no error on the executors -- it appears
as if the job never gets properly dispatched -- the only message is the
Broken Pipe message in the driver.
--
View this message in context:
Hi guys,
The Kafka’s examples in master branch were canceled?
Thanks
--
Informativa sulla Privacy: http://www.unibs.it/node/8155
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail:
Would it make sense to read each file in as a separate RDD? This way you
would be guaranteed the data is partitioned as you expected.
Possibly you could then repartition each of those RDDs into a single
partition and then union them. I think that would achieve what you expect.
But it would be
If your data is in hdfs and you are reading as textFile and each file is
less than block size, my understanding is it would always have one
partition per file.
On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io
wrote:
Would it make sense to read each file in as a separate
I built my Spark Streaming app on my local machine, and an initial step in
log processing is filtering out rows with spam IPs. I use the following
code which works locally:
// Creates a HashSet for badIPs read in from file
val badIpSource = scala.io.Source.fromFile(wrongIPlist.csv)
I was running a proof of concept for my company with spark streaming, and
the conclusion I came to is that spark collects data for the
batch-duration, THEN starts the data-pipeline calculations.
My batch size was 5 minutes, and the CPU was all but dead for 5, then when
the 5 minutes were up the
1) Your have a receiver thread. That thread might use alot of CPU, or not,
depending on how you implement the thread in onStart.
2) Every 5 minutes, spark will submit a job which process
every RDD which was created (i.e using the store() call) in the
receiver . That job will run asynchronously
Yes. Data is collected for 5 minutes, then processing starts at the
end. The result may be an arbitrary function of the data in the
interval, so the interval has to finish before computation can start.
If you want more continuous processing, you can simply reduce the
batch interval to, say, 1
I believe Rishi is correct. I wouldn't rely on that though - all it would
take is for one file to exceed the block size and you'd be setting yourself
up for pain. Also, if your files are small - small enough to fit in a
single record - you could use SparkContext.wholeTextFile.
On Thu, Nov 13,
Hi Arthur,
May I know what is the solution., I have similar requirements.
Regards,
Vasu C
On Sun, Oct 26, 2014 at 12:09 PM, arthur.hk.c...@gmail.com
arthur.hk.c...@gmail.com wrote:
Hi,
I have already found the way about how to “insert into HIVE_TABLE values
(…..)
Regards
Arthur
On
I am seeing skewed execution times. As far as I can tell, they are
attributable to differences in data locality - tasks with locality
PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.
This seems entirely as it should be - the question is, why the different
locality levels?
I am
rdd1.union(rdd2).groupByKey()
On Thu, Nov 13, 2014 at 3:41 AM, Blind Faith person.of.b...@gmail.com wrote:
Let us say I have the following two RDDs, with the following key-pair
values.
rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]
and
rdd2 = [ (key1, [value5,
Hi Patrick,
Although we are able to use Spark 1.1.0 with Play 2.2.x, as you mentioned, Akka
incompatibility prevents us from using Spark with the current stable releases
of Play (2.3.6) and Akka (2.3.7). Are there any plans to address this issue in
Spark 1.2?
Thanks,
Mohammed
From: John
Thanks Chneg, Just one more question - does that mean that we still need
enough memory in the cluster to uncompress the data before it can be
compressed again or does that just read the raw data as is?
On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote:
Currently there’s
It seems sparkcontext is good fit to be used with 'with' in python. A context
manager will do.
example:
with SparkContext(conf=conf, batchSize=512) as sc:
Then sc.stop() is not necessary to write any more.
--
View this message in context:
On Thu, Nov 13, 2014 at 11:02 AM, Sean Owen so...@cloudera.com wrote:
Yes. Data is collected for 5 minutes, then processing starts at the
end. The result may be an arbitrary function of the data in the
interval, so the interval has to finish before computation can start.
Thanks everyone.
You mentioned that the 3.1 min run was the one that did the actual caching,
so did that run before any data was cached, or after?
I would recommend checking the Storage tab of the UI, and clicking on the
RDD, to see both how full the executors' storage memory is (which may be
significantly less
Anyone experienced this before? Any help would be appreciated
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Map-output-statuses-exceeds-frameSize-tp18783p18866.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
This way?
scala val epoch = System.currentTimeMillis
epoch: Long = 1415903974545
scala val date = new Date(epoch)
date: java.util.Date = Fri Nov 14 00:09:34 IST 2014
Thanks
Best Regards
On Thu, Nov 13, 2014 at 10:17 PM, spr s...@yarcdata.com wrote:
Apologies for what seems an egregiously
I have large files that need to be imported into hdfs for further spark
processing. Obviously, I can import it in using hadoop fs however, there is
some minor processing that needs to be performed around a few
transformations, stripping the header line, and other such stuff.
I would like to stay
You could also use the jodatime library, which has a ton of great other
options in it.
J
ᐧ
*JIMMY MCERLAIN*
DATA SCIENTIST (NERD)
*. . . . . . . . . . . . . . . . . .*
*IF WE CAN’T DOUBLE YOUR SALES,*
*ONE OF US IS IN THE WRONG BUSINESS.*
*E*: ji...@sellpoints.com
*M*: *510.303.7751*
The following code fails with NullPointerException in RDD class on the
count function:
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(11 to 20)
rdd1.map{ i =
rdd2.count
}
.foreach(println(_))
The same goes for any other action I am trying to perform
Thanks for the thoughts. I've been testing on Spark 1.1 and haven't seen
the IndexError yet. I've run into some other errors (too many open
files), but these issues seem to have been discussed already. The dataset,
by the way, was about 40 Gb and 188 million lines; I'm running a sort on 3
worker
For one of my Spark jobs, my workers/executors are dying and leaving the
cluster.
On the master, I see something like the following in the log file. I'm
surprised to see the '60' seconds in the master log below because I explicitly
set it to '600' (or so I thought) in my spark job (see
You cannot reference an RDD within a closure passed to another RDD. Your
code should instead look like this:
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(11 to 20)
val rdd2Count = rdd2.count
rdd1.map{ i =
rdd2Count
}
.foreach(println(_))
You
The errors maybe happens because that there is not enough memory in
worker, so it keeping spilling with many small files, could you verify
that the PR [1] could fix your problem?
[1] https://github.com/apache/spark/pull/3252
On Thu, Nov 13, 2014 at 11:28 AM, santon steven.m.an...@gmail.com
Thanks for the responses Daniel and Rishi.
No i don't want separate RDD because each of these partitions are being
processed the same way (in my case, each partition corresponds to HBase
keys belonging to one region server, and i will do HBase lookups). After
that i have aggregations too, hence
If the file is not present on each node, it may not find it.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Building-a-hash-table-from-a-csv-file-using-yarn-cluster-and-giving-it-to-each-executor-tp18850p18877.html
Sent from the Apache Spark User List
On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com
wrote
No i don't want separate RDD because each of these partitions are being
processed the same way (in my case, each partition corresponds to HBase
keys belonging to one region server, and i will do HBase lookups).
Hi,
I'm using GraphX and playing around with its PageRank algorithm. However, I
can't see from the documentation how to use edge weight when running PageRank.
Is this possible to consider edge weights and how would I do it?
Thank you very much for your help and my best regards,
Jürgen
Hello,
I'm attempting to implement a clustering algorithm on top of Pregel
implementation in GraphX, however I'm hitting a wall. Ideally, I'd like to
be able to get all edges for a specific vertex, since they factor into the
calculation. My understanding was that sendMsg function would receive
Hi All,
I use textFile to create a RDD. However, I don't want to handle the whole
data in this RDD. For example, maybe I only want to solve the data in 3rd
partition of the RDD.
How can I do it? Here are some possible solutions that I'm thinking:
1. Create multiple RDDs when reading the file
2.
Why do you only want the third partition? You can access individual partitions
using the partitions() function. You can also filter your data using the
filter() function to only contain the data you care about. Moreover, when you
create your RDDs unless you define a custom partitioner you have
The direct answere you are looking for may be in RDD.mapPartitionsWithIndex()
The better question is, why are you looking into only the 3rd partition? To
analyze a random sample? Then look into RDD.sample(). Are you sure the data
you are looking for is in the 3rd partition? What if you end up
Does Spark JDBC thrift server allow connections over HTTP?
http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#running-the-thrift-jdbc-server
doesn't see to indicate this feature.
If the feature isn't there it it planned? Is there a tracking JIRA?
Thank you,
Vinay
--
View this
Hi,
I think that there are two solutions;
1. Invalid edges send Iterator.empty messages in sendMsg of the Pregel API.
These messages make no effect on corresponding vertices.
2. Use GraphOps.(collectNeighbors/collectNeighborIds), not the Pregel API
so as to
handle active edge lists by
Hi,
I am using the following code to generate the (score, count) for each
window:
val score_count_by_window = topic.map(r = r._2) // r._2 is the integer
score
.countByValue()
score_count_by_window.print()
HI, all
Is there setup and cleanup function as in hadoop mapreduce in spark which does
some initialization and cleanup work?
Best Regards,
Kevin.
Hi,
I guess I found part of the issue: I said
dstream.transform(rdd = { rdd.foreachPartition(...); rdd })
instead of
dstream.transform(rdd = { rdd.mapPartitions(...) }),
that's why stop() would not stop the processing.
Now with the new version a non-graceful shutdown works in the sense that
I would think this should be done at the application level.
After all, the core functionality of SparkStreaming is to capture RDDs in
some real time interval and process them -
not to aggregate their results.
But maybe there is a better way...
On Thu, Nov 13, 2014 at 8:28 PM, SK
HTTP is not supported yet, and I don't think there's an JIRA ticket for it.
On 11/14/14 8:21 AM, vs wrote:
Does Spark JDBC thrift server allow connections over HTTP?
http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#running-the-thrift-jdbc-server
doesn't see to indicate this
No, the columnar buffer is built in a small batching manner, the batch
size is controlled by the |spark.sql.inMemoryColumnarStorage.batchSize|
property. The default value for this in master and branch-1.2 is 10,000
rows per batch.
On 11/14/14 1:27 AM, Sadhan Sood wrote:
Thanks Chneg, Just
If you’re looking for executor side setup and cleanup functions, there
ain’t any yet, but you can achieve the same semantics via
|RDD.mapPartitions|.
Please check the “setup() and cleanup” section of this blog from
Cloudera for details:
Hi,
I am trying to save an RDD to an S3 bucket using
RDD.saveAsSequenceFile(self, path, CompressionCodec) function in python. I
need to save the RDD in GZIP. Can anyone tell me how to send the gzip codec
class as a parameter into the function.
I tried
So can I write it like this?
rdd.mapPartition(i = setup(); i).map(...).mapPartition(i = cleanup(); i)
So I don't need to mess up the logic and still can use map, filter and
other transformations for RDD.
Jianshi
On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian lian.cs@gmail.com wrote:
If
Thank you,
Recompiling spark was not as complicated as I feared and it seems to work.
Since then we have decided to migrate to 5.2.0 so the problem was mitigated but
if anyone else has this issue, I can verify this method works
-Original Message-
From: Marcelo Vanzin
Hi Darin,
In our case, we were getting the error gue to long GC pauses in our app.
Fixing the underlying code helped us remove this error. This is also
mentioned as point 1 in the link below:
Hi ,
I am running pyspark job.
I need serialize final result to *hdfs in binary files* and having ability
to give a *name for output files*.
I found this post:
http://stackoverflow.com/questions/25293962/specifying-the-output-file-name-in-apache-spark
but it explains how to do it using scala.
If you’re just relying on the side effect of |setup()| and |cleanup()|
then I think this trick is OK and pretty cleaner.
But if |setup()| returns, say, a DB connection, then the |map(...)| part
and |cleanup()| can’t get the connection object.
On 11/14/14 1:20 PM, Jianshi Huang wrote:
So
Ok, then we need another trick.
let's have an *implicit lazy var connection/context* around our code. And
setup() will trigger the eval and initialization.
The implicit lazy val/var trick is actually invented by Kevin. :)
Jianshi
On Fri, Nov 14, 2014 at 1:41 PM, Cheng Lian
I wonder if SparkConf is dynamically updated on all worker nodes or only
during initialization. It can be used to piggyback information.
Otherwise I guess you are stuck with Broadcast.
Primarily I have had these issues moving legacy MR operators to Spark where
MR piggybacks on Hadoop conf pretty
Hi,
I am using Spark 1.0.0 and Scala 2.10.3.
I want to use toLocalIterator in a code but the spark shell tells
*not found: value toLocalIterator*
I also did import org.apache.spark.rdd but even after this the shell tells
*object toLocalIterator is not a member of package org.apache.spark.rdd*
@Aris, we are closely following the PMML work that is going on and as
Xiangrui mentioned, it might be easier to migrate models such as logistic
regression and then migrate trees. Some of the models get fairly large (as
pointed out by Sung Chung) with deep trees as building blocks and we might
have
It looks like you are trying to directly import the toLocalIterator
function. You can't import functions, it should just appear as a
method of an existing RDD if you have one.
- Patrick
On Thu, Nov 13, 2014 at 10:21 PM, Deep Pradhan
pradhandeep1...@gmail.com wrote:
Hi,
I am using Spark 1.0.0
Darin,
You might want to increase these config options also:
spark.akka.timeout 300
spark.storage.blockManagerSlaveTimeoutMs 30
On Thu, Nov 13, 2014 at 11:31 AM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:
For one of my Spark jobs, my workers/executors are dying and leaving the
One option maybe call HDFS tools or client to rename them after saveAsXXXFile().
On Thu, Nov 13, 2014 at 9:39 PM, Oleg Ruchovets oruchov...@gmail.com wrote:
Hi ,
I am running pyspark job.
I need serialize final result to hdfs in binary files and having ability to
give a name for output
You could use the following as compressionCodecClass:
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
for gzip,
77 matches
Mail list logo