Lifecycle of a map function
Hi all, I'm trying to guess understand what is the lifecycle of a map function in spark/yarn context. My understanding is that function is instantiated on the master and then passed to each executor (serialized/deserialized). What I'd like to confirm is that the function is initialized/loaded/deserialized once per executor (JVM in yarn) and lives as long as executor lives and not once per task (logical unit of work to do). Could you please explain or, better, give some links to source code or documentation? I've tried to take a look in Task.scala and ResultTask.scala but I'm not familiar with Scala and didn't find where exactly is function lifecycle managed. Thanks in advance, Vadim.
Pyspark error when converting string to timestamp in map function
Hi all, I'm trying to create a dataframe enforcing a schema so that I can write it to a parquet file. The schema has timestamps and I get an error with pyspark. The following is a snippet of code that exhibits the problem, df = sqlctx.range(1000) schema = StructType([StructField('a', TimestampType(), True)]) df1 = sqlctx.createDataFrame(df.rdd.map(row_gen_func), schema) row_gen_func is a function that retruns timestamp strings of the form "2018-03-21 11:09:44" When I compile this with Spark 2.2 I get the following error, raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj))) TypeError: TimestampType can not accept object '2018-03-21 08:06:17' in type Regards, Keith. http://keith-chapman.com
Re: Issue with map function in Spark 2.2.0
As the error says clearly, column FL Date has a different format that you are expecting. Modify you date format mask appropriately On Wed, 11 Apr 2018 at 5:12 pm, @Nandan@ wrote: > Hi , > I am not able to use .map function in Spark. > > My codes are as below :- > > *1) Create Parse function:-* > > from datetime import datetime > from collections import namedtuple > fields = > ('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance') > Flight = namedtuple('Flight',fields,verbose=True) > DATE_FMT = "%y-%m-%d" > TIME_FMT = "%H%M" > def parse(row) : > row[0] = datetime.strptime(row[0], DATE_FMT).date() > row[5] = datetime.strptime(row[5], TIME_FMT).time() > row[6] = float(row[6]) > row[7] = datetime.strptime(row[7], TIME_FMT).time() > row[8] = float(row[8]) > row[9] = float(row[9]) > row[10] = float(row[10]) > return Flight(*row[:11]) > > *2) Using Parse to parse my RDD* > > flightsParsedMap = flights.map(lambda x: x.split(',')).map(parse) > > *3) Checking Parsed RDD * > flightsParsedMap > *Output is :- * > > *PythonRDD[8] at RDD at PythonRDD.scala:48* > *4) Checking first row :-* > > flightsParsedMap.first() > Here i am getting issue:- > > > > ---Py4JJavaError > Traceback (most recent call > last) in ()> 1 > flightsParsedMap.first() > C:\spark\spark\python\pyspark\rdd.py in first(self) 1374 > ValueError: RDD is empty 1375 """-> 1376 rs = self.take(1) > 1377 if rs: 1378 return rs[0] > C:\spark\spark\python\pyspark\rdd.py in take(self, num) 13561357 > p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))-> > 1358 res = self.context.runJob(self, takeUpToNumLeft, p) 1359 > 1360 items += res > C:\spark\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, > partitions, allowLocal)999 # SparkContext#runJob. 1000 > mappedRDD = rdd.mapPartitions(partitionFunc)-> 1001 port = > self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) > 1002 return list(_load_from_socket(port, > mappedRDD._jrdd_deserializer)) 1003 > C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in > __call__(self, *args) 1158 answer = > self.gateway_client.send_command(command) 1159 return_value = > get_return_value(-> 1160 answer, self.gateway_client, > self.target_id, self.name) 11611162 for temp_arg in temp_args: > C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def > deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) > 64 except py4j.protocol.Py4JJavaError as e: 65 s = > e.java_exception.toString() > C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in > get_return_value(answer, gateway_client, target_id, name)318 >raise Py4JJavaError(319 "An error occurred while > calling {0}{1}{2}.\n".--> 320 format(target_id, ".", > name), value)321 else:322 raise Py4JError( > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 > (TID 9, localhost, executor driver): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, > in main > File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, > in process > File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line > 372, in dump_stream > vs = list(itertools.islice(iterator, batch)) > File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft > yield next(iterator) > File "", line 8, in parse > File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in > _strptime_datetime > tt, fraction = _strptime(data_string, format) > File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime > (data_string, format)) > ValueError:
Issue with map function in Spark 2.2.0
Hi , I am not able to use .map function in Spark. My codes are as below :- *1) Create Parse function:-* from datetime import datetime from collections import namedtuple fields = ('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance') Flight = namedtuple('Flight',fields,verbose=True) DATE_FMT = "%y-%m-%d" TIME_FMT = "%H%M" def parse(row) : row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) *2) Using Parse to parse my RDD* flightsParsedMap = flights.map(lambda x: x.split(',')).map(parse) *3) Checking Parsed RDD * flightsParsedMap *Output is :- * *PythonRDD[8] at RDD at PythonRDD.scala:48* *4) Checking first row :-* flightsParsedMap.first() Here i am getting issue:- ---Py4JJavaError Traceback (most recent call last) in ()> 1 flightsParsedMap.first() C:\spark\spark\python\pyspark\rdd.py in first(self) 1374 ValueError: RDD is empty 1375 """-> 1376 rs = self.take(1) 1377 if rs: 1378 return rs[0] C:\spark\spark\python\pyspark\rdd.py in take(self, num) 13561357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))-> 1358 res = self.context.runJob(self, takeUpToNumLeft, p) 13591360 items += res C:\spark\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)999 # SparkContext#runJob. 1000 mappedRDD = rdd.mapPartitions(partitionFunc)-> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 1003 C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value(-> 1160 answer, self.gateway_client, self.target_id, self.name) 11611162 for temp_arg in temp_args: C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)318 raise Py4JJavaError(319 "An error occurred while calling {0}{1}{2}.\n".--> 320 format(target_id, ".", name), value)321 else:322 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 9, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft yield next(iterator) File "", line 8, in parse File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in _strptime_datetime tt, fraction = _strptime(data_string, format) File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime (data_string, format)) ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.
Resource manage inside map function
Hi, I have a spark job which needs to access HBase inside a mapToPair function. The question is that I do not want to connect to HBase and close connection each time. As I understand, PairFunction is not designed to manage resources with setup() and close(), like Hadoop reader and writer. Does spark support this kind of resource manage? Your help is appreciated! By the way, the reason I do not want to use writer is that I want to return some metric values after processing. The returned metric values will be further processed. Basically, it is not desirable to use HDFS as transfer location. Thanks, Huiliang
Re: Spark SQL within a DStream map function
Do you really need to create a DStream from the original messaging queue? Can't you just read them in a while loop or something on the driver? On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo wrote: > Hello, > > I have a web application that publishes JSON messages on to a messaging > queue that contain metadata and a link to a CSV document on S3. I'd like > to iterate over these JSON messages, and for each one pull the CSV document > into spark SQL to transform it (based on the metadata in the JSON message) > and output the results to a search index. Each file on S3 has different > headers, potentially different delimiters, and differing numbers of rows. > > Basically what I'm trying to do is something like this: > > JavaDStream parsedMetadataAndRows = > queueStream.map(new Function() { > @Override > ParsedDocument call(String metadata) throws Exception { > Map gson = new Gson().fromJson(metadata, Map.class) > > // get metadata from gson > String s3Url = gson.url > String delimiter = gson.delimiter > // etc... > > // read s3Url > Dataset dataFrame = sqlContext.read() > .format("com.databricks.spark.csv") > .option("delimiter", delimiter) > .option("header", true) > .option("inferSchema", true) > .load(url) > > // process document, > ParsedDocument docPlusRows = //... > > return docPlusRows > }) > > JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows, > "index/type" // ... > > > But it appears I cannot get access to the sqlContext when I run this on > the spark cluster because that code is executing in the executor not in the > driver. > > Is there a way I can access or create a SqlContext to be able to pull the > file down from S3 in my map function? Or do you have any recommendations > as to how I could set up a streaming job in a different way that would > allow me to accept metadata on the stream of records coming in and pull > each file down from s3 for processing? > > Thanks in advance for your help! > > Mike >
Spark SQL within a DStream map function
Hello, I have a web application that publishes JSON messages on to a messaging queue that contain metadata and a link to a CSV document on S3. I'd like to iterate over these JSON messages, and for each one pull the CSV document into spark SQL to transform it (based on the metadata in the JSON message) and output the results to a search index. Each file on S3 has different headers, potentially different delimiters, and differing numbers of rows. Basically what I'm trying to do is something like this: JavaDStream parsedMetadataAndRows = queueStream.map(new Function() { @Override ParsedDocument call(String metadata) throws Exception { Map gson = new Gson().fromJson(metadata, Map.class) // get metadata from gson String s3Url = gson.url String delimiter = gson.delimiter // etc... // read s3Url Dataset dataFrame = sqlContext.read() .format("com.databricks.spark.csv") .option("delimiter", delimiter) .option("header", true) .option("inferSchema", true) .load(url) // process document, ParsedDocument docPlusRows = //... return docPlusRows }) JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows, "index/type" // ... But it appears I cannot get access to the sqlContext when I run this on the spark cluster because that code is executing in the executor not in the driver. Is there a way I can access or create a SqlContext to be able to pull the file down from S3 in my map function? Or do you have any recommendations as to how I could set up a streaming job in a different way that would allow me to accept metadata on the stream of records coming in and pull each file down from s3 for processing? Thanks in advance for your help! Mike
Re: create column with map function apply to dataframe
If I understand your question you should look at withColumn of dataframe api. df.withColumn("len", len("l")) Thanks Ankur On Fri, Apr 14, 2017 at 6:07 AM, issues solution wrote: > Hi , > how you can create column inside map function > > > like that : > > df.map(lambd l : len(l) ) . > > but instead return rdd we create column insde data frame . >
create column with map function apply to dataframe
Hi , how you can create column inside map function like that : df.map(lambd l : len(l) ) . but instead return rdd we create column insde data frame .
Re: Reference External Variables in Map Function (Inner class)
Hello Marcelo, Finally what was the solution, I face the same problem. Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reference-External-Variables-in-Map-Function-Inner-class-tp11990p28237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)
Please post a minimal complete code example of what you are talking about On Thu, Dec 15, 2016 at 6:00 PM, Michael Nguyen wrote: > I have the following sequence of Spark Java API calls (Spark 2.0.2): > > Kafka stream that is processed via a map function, which returns the string > value from tuple2._2() for JavaDStream as in > > return tuple2._2(); > > The returned JavaDStream is then processed by foreachPartition, which is > wrapped by foreachRDD. > > foreachPartition's call function does Iterator on the RDD as in > inputRDD.next (); > > When data is received, step 1 is executed, which is correct. However, > inputRDD.next () in step 3 makes a duplicate call to the map function in > step 1. So that map function is called twice for every message: > > - the first time when the message is received from the Kafka stream, and > > - the second time when Iterator inputParams.next () is invoked from > foreachPartition's call function. > > I also tried transforming the data in the map function as in > > public TestTransformedClass call(Tuple2 tuple2) for step 1 > > public void call(Iterator inputParams) for step 3 > > and the same issue occurs. So this issue occurs, no matter whether this > sequence of Spark API calls involves data transformation or not. > > Questions: > > Since the message was already processed in step 1, why does inputRDD.next () > in step 3 makes a duplicate call to the map function in step 1 ? > > How do I fix it to avoid duplicate invocation for every message ? > > Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)
I have the following sequence of Spark Java API calls (Spark 2.0.2): 1. Kafka stream that is processed via a map function, which returns the string value from tuple2._2() for JavaDStream as in return tuple2._2(); 1. The returned JavaDStream is then processed by foreachPartition, which is wrapped by foreachRDD. 2. foreachPartition's call function does Iterator on the RDD as in inputRDD.next (); When data is received, step 1 is executed, which is correct. However, inputRDD.next () in step 3 makes a duplicate call to the map function in step 1. So that map function is called twice for every message: - the first time when the message is received from the Kafka stream, and - the second time when Iterator inputParams.next () is invoked from foreachPartition's call function. I also tried transforming the data in the map function as in public TestTransformedClass call(Tuple2 tuple2) for step 1 public void call(Iterator inputParams) for step 3 and the same issue occurs. So this issue occurs, no matter whether this sequence of Spark API calls involves data transformation or not. Questions: 1. Since the message was already processed in step 1, why does inputRDD.next () in step 3 makes a duplicate call to the map function in step 1 ? 2. How do I fix it to avoid duplicate invocation for every message ? Thanks.
Re: [DataFrames] map function - 2.0
Experimental in Spark really just means that we are not promising binary compatibly for those functions in the 2.x release line. For Datasets in particular, we want a few releases to make sure the APIs don't have any major gaps before removing the experimental tag. On Thu, Dec 15, 2016 at 1:17 PM, Ninad Shringarpure wrote: > Hi Team, > > When going through Dataset class for Spark 2.0 it comes across that both > overloaded map functions with encoder and without are marked as > experimental. > > Is there a reason and issues that developers whould be aware of when using > this for production applications. Also is there a "non-experimental" way of > using map function on Dataframe in Spark 2.0 > > Thanks, > Ninad >
[DataFrames] map function - 2.0
Hi Team, When going through Dataset class for Spark 2.0 it comes across that both overloaded map functions with encoder and without are marked as experimental. Is there a reason and issues that developers whould be aware of when using this for production applications. Also is there a "non-experimental" way of using map function on Dataframe in Spark 2.0 Thanks, Ninad
Re: How to return a case class in map function?
2.0.1 has fixed the bug. Thanks very much. On Thu, Nov 3, 2016 at 6:22 PM, 颜发才(Yan Facai) wrote: > Thanks, Armbrust. > I'm using 2.0.0. > Does 2.0.1 stable version fix it? > > On Thu, Nov 3, 2016 at 2:01 AM, Michael Armbrust > wrote: > >> Thats a bug. Which version of Spark are you running? Have you tried >> 2.0.2? >> >> On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai) wrote: >> >>> Hi, all. >>> When I use a case class as return value in map function, spark always >>> raise a ClassCastException. >>> >>> I write an demo, like: >>> >>> scala> case class Record(key: Int, value: String) >>> >>> scala> case class ID(key: Int) >>> >>> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF >>> >>> scala> df.map{x => ID(x.getInt(0))}.show >>> >>> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 >>> (TID 175) >>> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast >>> to $line1401.$read$$iw$$iw$ID >>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen >>> eratedIterator.processNext(Unknown Source) >>> >>> >>> Please tell me if I'm wrong. >>> Thanks. >>> >>> >> >> >
Re: How to return a case class in map function?
Thanks, Armbrust. I'm using 2.0.0. Does 2.0.1 stable version fix it? On Thu, Nov 3, 2016 at 2:01 AM, Michael Armbrust wrote: > Thats a bug. Which version of Spark are you running? Have you tried > 2.0.2? > > On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai) wrote: > >> Hi, all. >> When I use a case class as return value in map function, spark always >> raise a ClassCastException. >> >> I write an demo, like: >> >> scala> case class Record(key: Int, value: String) >> >> scala> case class ID(key: Int) >> >> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF >> >> scala> df.map{x => ID(x.getInt(0))}.show >> >> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 >> (TID 175) >> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast >> to $line1401.$read$$iw$$iw$ID >> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen >> eratedIterator.processNext(Unknown Source) >> >> >> Please tell me if I'm wrong. >> Thanks. >> >> > >
Re: How to return a case class in map function?
Thats a bug. Which version of Spark are you running? Have you tried 2.0.2? On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai) wrote: > Hi, all. > When I use a case class as return value in map function, spark always > raise a ClassCastException. > > I write an demo, like: > > scala> case class Record(key: Int, value: String) > > scala> case class ID(key: Int) > > scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF > > scala> df.map{x => ID(x.getInt(0))}.show > > 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 > (TID 175) > java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to > $line1401.$read$$iw$$iw$ID > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown Source) > > > Please tell me if I'm wrong. > Thanks. > >
How to return a case class in map function?
Hi, all. When I use a case class as return value in map function, spark always raise a ClassCastException. I write an demo, like: scala> case class Record(key: Int, value: String) scala> case class ID(key: Int) scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF scala> df.map{x => ID(x.getInt(0))}.show 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 (TID 175) java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to $line1401.$read$$iw$$iw$ID at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) Please tell me if I'm wrong. Thanks.
Re: Is there a way to dynamic load files [ parquet or csv ] in the map function?
Yes .You can do something like this : .map(x=>mapfunction(x)) Thanks Deepak On 9 Jul 2016 9:22 am, "charles li" wrote: > > hi, guys, is there a way to dynamic load files within the map function. > > i.e. > > Can I code as bellow: > > > > > thanks a lot. > > > > -- > *___* > > Quant | Engineer | Boy > *___* > *blog*:http://litaotao.github.io > *github*: www.github.com/litaotao >
Is there a way to dynamic load files [ parquet or csv ] in the map function?
hi, guys, is there a way to dynamic load files within the map function. i.e. Can I code as bellow: thanks a lot. -- *___* Quant | Engineer | Boy *___* *blog*:http://litaotao.github.io *github*: www.github.com/litaotao
Re: DataFrame Find/Filter Based on Input - Inside Map function
You can keep a joined dataset cached and filter that joined df with your filter condition On 2 Jul 2015 15:01, "Mailing List" wrote: > I need to pass the value of the filter dynamically like where id= > and that someVal exist in another RDD. > > How can I do this across JavaRDD and DataFrame ? > > Sent from my iPad > > On Jul 2, 2015, at 12:49 AM, ayan guha wrote: > > You can directly use filter on a Dataframe > On 2 Jul 2015 12:15, "Ashish Soni" wrote: > >> Hi All , >> >> I have an DataFrame Created as below >> >> options.put("dbtable", "(select * from user) as account"); >> DataFrame accountRdd = >> sqlContext.read().format("jdbc").options(options).load(); >> >> and i have another RDD which contains login name and i want to find the >> userid from above DF RDD and return it >> >> Not sure how can i do that as when i apply a map function and say filter >> on DF i get Null pointor exception. >> >> Please help. >> >> >>
Re: DataFrame Find/Filter Based on Input - Inside Map function
I need to pass the value of the filter dynamically like where id= and that someVal exist in another RDD. How can I do this across JavaRDD and DataFrame ? Sent from my iPad > On Jul 2, 2015, at 12:49 AM, ayan guha wrote: > > You can directly use filter on a Dataframe > >> On 2 Jul 2015 12:15, "Ashish Soni" wrote: >> Hi All , >> >> I have an DataFrame Created as below >> >> options.put("dbtable", "(select * from user) as account"); >> DataFrame accountRdd = >> sqlContext.read().format("jdbc").options(options).load(); >> >> and i have another RDD which contains login name and i want to find the >> userid from above DF RDD and return it >> >> Not sure how can i do that as when i apply a map function and say filter on >> DF i get Null pointor exception. >> >> Please help.
Re: DataFrame Find/Filter Based on Input - Inside Map function
You can directly use filter on a Dataframe On 2 Jul 2015 12:15, "Ashish Soni" wrote: > Hi All , > > I have an DataFrame Created as below > > options.put("dbtable", "(select * from user) as account"); > DataFrame accountRdd = > sqlContext.read().format("jdbc").options(options).load(); > > and i have another RDD which contains login name and i want to find the > userid from above DF RDD and return it > > Not sure how can i do that as when i apply a map function and say filter > on DF i get Null pointor exception. > > Please help. > > >
DataFrame Find/Filter Based on Input - Inside Map function
Hi All , I have an DataFrame Created as below options.put("dbtable", "(select * from user) as account"); DataFrame accountRdd = sqlContext.read().format("jdbc").options(options).load(); and i have another RDD which contains login name and i want to find the userid from above DF RDD and return it Not sure how can i do that as when i apply a map function and say filter on DF i get Null pointor exception. Please help.
RE: Can a map function return null
In fact you can return “NULL” from your initial map and hence not resort to Optional at all From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Sunday, April 19, 2015 9:48 PM To: 'Steve Lewis' Cc: 'Olivier Girardot'; 'user@spark.apache.org' Subject: RE: Can a map function return null Well you can do another map to turn Optional into String as in the cases when Optional is empty you can store e.g. “NULL” as the value of the RDD element If this is not acceptable (based on the objectives of your architecture) and IF when returning plain null instead of Optional does throw Spark exception THEN as far as I am concerned, chess-mate From: Steve Lewis [mailto:lordjoe2...@gmail.com] Sent: Sunday, April 19, 2015 8:16 PM To: Evo Eftimov Cc: Olivier Girardot; user@spark.apache.org Subject: Re: Can a map function return null So you imagine something like this: JavaRDD words = ... JavaRDD< Optional> wordsFiltered = words.map(new Function>() { @Override public Optional call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD< Optional> which cannot be used as a JavaRDD which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov wrote: I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on "item != null" In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item => if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDD original = ... JavaRDD words = original.flatMap(new FlatMapFunction() { @Override public Iterable call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDD words = original.map(new MapFunction() { @Override Foo call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } }); -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
RE: Can a map function return null
Well you can do another map to turn Optional into String as in the cases when Optional is empty you can store e.g. “NULL” as the value of the RDD element If this is not acceptable (based on the objectives of your architecture) and IF when returning plain null instead of Optional does throw Spark exception THEN as far as I am concerned, chess-mate From: Steve Lewis [mailto:lordjoe2...@gmail.com] Sent: Sunday, April 19, 2015 8:16 PM To: Evo Eftimov Cc: Olivier Girardot; user@spark.apache.org Subject: Re: Can a map function return null So you imagine something like this: JavaRDD words = ... JavaRDD< Optional> wordsFiltered = words.map(new Function>() { @Override public Optional call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD< Optional> which cannot be used as a JavaRDD which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov wrote: I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on "item != null" In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item => if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDD original = ... JavaRDD words = original.flatMap(new FlatMapFunction() { @Override public Iterable call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDD words = original.map(new MapFunction() { @Override Foo call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } }); -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Can a map function return null
So you imagine something like this: JavaRDD words = ... JavaRDD< Optional> wordsFiltered = words.map(new Function>() { @Override public Optional call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD< Optional> which cannot be used as a JavaRDD which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov wrote: > I am on the move at the moment so i cant try it immediately but from > previous memory / experience i think if you return plain null you will get > a spark exception > > Anyway yiu can try it and see what happens and then ask the question > > If you do get exception try Optional instead of plain null > > > Sent from Samsung Mobile > > > Original message > From: Olivier Girardot > Date:2015/04/18 22:04 (GMT+00:00) > To: Steve Lewis ,user@spark.apache.org > Subject: Re: Can a map function return null > > You can return an RDD with null values inside, and afterwards filter on > "item != null" > In scala (or even in Java 8) you'd rather use Option/Optional, and in > Scala they're directly usable from Spark. > Exemple : > > sc.parallelize(1 to 1000).flatMap(item => if (item % 2 ==0) Some(item) > else None).collect() > > res0: Array[Int] = Array(2, 4, 6, ) > > Regards, > > Olivier. > > Le sam. 18 avr. 2015 à 20:44, Steve Lewis a > écrit : > >> I find a number of cases where I have an JavaRDD and I wish to transform >> the data and depending on a test return 0 or one item (don't suggest a >> filter - the real case is more complex). So I currently do something like >> the following - perform a flatmap returning a list with 0 or 1 entry >> depending on the isUsed function. >> >> JavaRDD original = ... >> JavaRDD words = original.flatMap(new FlatMapFunction() { >> @Override >> public Iterable call(final Foo s) throws Exception { >> List ret = new ArrayList(); >> if(isUsed(s)) >>ret.add(transform(s)); >> return ret; // contains 0 items if isUsed is false >> } >> }); >> >> My question is can I do a map returning the transformed data and null if >> nothing is to be returned. as shown below - what does a Spark do with a map >> function returning null >> >> JavaRDD words = original.map(new MapFunction() { >> @Override >> Foo call(final Foo s) throws Exception { >> List ret = new ArrayList(); >> if(isUsed(s)) >>return transform(s); >> return null; // not used - what happens now >> } >> }); >> >> >> >> -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Can a map function return null
I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on "item != null" In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item => if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDD original = ... JavaRDD words = original.flatMap(new FlatMapFunction() { @Override public Iterable call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDD words = original.map(new MapFunction() { @Override Foo call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } });
Re: Can a map function return null
You can return an RDD with null values inside, and afterwards filter on "item != null" In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item => if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis a écrit : > I find a number of cases where I have an JavaRDD and I wish to transform > the data and depending on a test return 0 or one item (don't suggest a > filter - the real case is more complex). So I currently do something like > the following - perform a flatmap returning a list with 0 or 1 entry > depending on the isUsed function. > > JavaRDD original = ... > JavaRDD words = original.flatMap(new FlatMapFunction() { > @Override > public Iterable call(final Foo s) throws Exception { > List ret = new ArrayList(); > if(isUsed(s)) >ret.add(transform(s)); > return ret; // contains 0 items if isUsed is false > } > }); > > My question is can I do a map returning the transformed data and null if > nothing is to be returned. as shown below - what does a Spark do with a map > function returning null > > JavaRDD words = original.map(new MapFunction() { > @Override > Foo call(final Foo s) throws Exception { > List ret = new ArrayList(); > if(isUsed(s)) >return transform(s); > return null; // not used - what happens now > } > }); > > > >
Can a map function return null
I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDD original = ... JavaRDD words = original.flatMap(new FlatMapFunction() { @Override public Iterable call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDD words = original.map(new MapFunction() { @Override Foo call(final Foo s) throws Exception { List ret = new ArrayList(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } });
Re: using sparkContext from within a map function (from spark streaming app)
Yes, you can never use the SparkContext inside a remote function. It is on the driver only. On Sun, Mar 8, 2015 at 4:22 PM, Daniel Haviv wrote: > Hi, > We are designing a solution which pulls file paths from Kafka and for the > current stage just counts the lines in each of these files. > When running the code it fails on: > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) > at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146) > at > org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46) > at streamReader.App.main(App.java:66) > > Is using the sparkContext from inside a map function wrong ? > > This is the code we are using: > SparkConf conf = new SparkConf().setAppName("Simple > Application").setMaster("spark://namenode:7077"); > > // KAFKA > final JavaStreamingContext jssc = new JavaStreamingContext(conf, > new Duration(2000)); > Map topicMap = new HashMap(); > topicMap.put("uploadedFiles", 1); > JavaPairReceiverInputDStream messages = > KafkaUtils.createStream(jssc, "localhost:2181", "group3", > topicMap); > > > JavaDStream files = messages.map(new > Function, String>() { > > public String call(Tuple2 tuple2) { > return tuple2._2(); > } > }); > > > JavaPairDStream pairs = messages.mapToPair( > new PairFunction, String, Integer>() > { > public Tuple2 call(Tuple2 String> word) throws Exception > { > JavaRDD textfile = > jssc.sparkContext().textFile(word._2()); > int test = new Long(textfile.count()).intValue(); > return new Tuple2 Integer>(word._2(), test); > } > }); > > > System.out.println("Printing Messages:"); > pairs.print(); > > jssc.start(); > jssc.awaitTermination(); >jssc.close(); > > Thanks, > Daniel > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
using sparkContext from within a map function (from spark streaming app)
Hi, We are designing a solution which pulls file paths from Kafka and for the current stage just counts the lines in each of these files. When running the code it fails on: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146) at org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46) at streamReader.App.main(App.java:66) Is using the sparkContext from inside a map function wrong ? This is the code we are using: SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("spark://namenode:7077"); // KAFKA final JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(2000)); Map topicMap = new HashMap(); topicMap.put("uploadedFiles", 1); JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, "localhost:2181", "group3", topicMap); JavaDStream files = messages.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaPairDStream pairs = messages.mapToPair( new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 word) throws Exception { JavaRDD textfile = jssc.sparkContext().textFile(word._2()); int test = new Long(textfile.count()).intValue(); return new Tuple2(word._2(), test); } }); System.out.println("Printing Messages:"); pairs.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); Thanks, Daniel
Using sparkContext in inside a map function
Hi, We are designing a solution which pulls file paths from Kafka and for the current stage just counts the lines in each of these files. When running the code it fails on: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146) at org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46) at streamReader.App.main(App.java:66) Is using the sparkContext from inside a map function wrong ? This is the code we are using: SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("spark://namenode:7077"); // KAFKA final JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(2000)); Map topicMap = new HashMap(); topicMap.put("uploadedFiles", 1); JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, "localhost:2181", "group3", topicMap); JavaDStream files = messages.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaPairDStream pairs = messages.mapToPair( new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 word) throws Exception { JavaRDD textfile = jssc.sparkContext().textFile(word._2()); int test = new Long(textfile.count()).intValue(); return new Tuple2(word._2(), test); } }); System.out.println("Printing Messages:"); pairs.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); Thanks, Daniel -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkContext-in-inside-a-map-function-tp21961.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using sparkContext in inside a map function
Hi, We are designing a solution which pulls file paths from Kafka and for the current stage just counts the lines in each of these files. When running the code it fails on: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146) at org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46) at streamReader.App.main(App.java:66) Is using the sparkContext from inside a map function wrong ? This is the code we are using: SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("spark://namenode:7077"); // KAFKA final JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(2000)); Map topicMap = new HashMap(); topicMap.put("uploadedFiles", 1); JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, "localhost:2181", "group3", topicMap); JavaDStream files = messages.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaPairDStream pairs = messages.mapToPair( new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 word) throws Exception { JavaRDD textfile = jssc.sparkContext().textFile(word._2()); int test = new Long(textfile.count()).intValue(); return new Tuple2(word._2(), test); } }); System.out.println("Printing Messages:"); pairs.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); Thanks, Daniel -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkContext-in-inside-a-map-function-tp21960.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Manually trigger RDD map function without action
Hey Kevin, I assume you want to trigger the map() for a side effect (since you don't care about the result). To Cody's point, you can use foreach() *instead* of map(). So instead of e.g. x.map(a => foo(a)).foreach(a => a), you'd run x.foreach(a => foo(a)). Best, -Sven On Mon, Jan 12, 2015 at 5:13 PM, Kevin Jung wrote: > Cody said "If you don't care about the value that your map produced > (because > you're not already collecting or saving it), then is foreach more > appropriate to what you're doing?" but I can not see it from this thread. > Anyway, I performed small benchmark to test what function is the most > efficient way. And a winner is foreach(a => a) according to everyone's > expectations. Collect can cause OOM from driver and count is very slower > than the others. Thanks all. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Manually trigger RDD map function without action
Cody said "If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing?" but I can not see it from this thread. Anyway, I performed small benchmark to test what function is the most efficient way. And a winner is foreach(a => a) according to everyone's expectations. Collect can cause OOM from driver and count is very slower than the others. Thanks all. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Manually trigger RDD map function without action
If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? On Mon, Jan 12, 2015 at 4:08 AM, kevinkim wrote: > Hi, answer from another Kevin. > > I think you may already know it, > 'transformation' in spark > ( > http://spark.apache.org/docs/latest/programming-guide.html#transformations > ) > will be done in 'lazy' way, when you trigger 'actions'. > (http://spark.apache.org/docs/latest/programming-guide.html#actions) > > So you can use > 'collect' - if you need result from memory > 'count' - if you need to count > 'saveAs ...' - if you need to persist transformed RDD > > Regards, > Kevin > > > On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] > <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=21095&i=0>> wrote: > >> Hi all >> Is there efficient way to trigger RDD transformations? I'm now using >> count action to achieve this. >> >> Best regards >> Kevin >> >> -- >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html >> To unsubscribe from Apache Spark User List, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > -- > View this message in context: Re: Manually trigger RDD map function > without action > <http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html> > > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Re: Manually trigger RDD map function without action
Hi, answer from another Kevin. I think you may already know it, 'transformation' in spark (http://spark.apache.org/docs/latest/programming-guide.html#transformations) will be done in 'lazy' way, when you trigger 'actions'. (http://spark.apache.org/docs/latest/programming-guide.html#actions) So you can use 'collect' - if you need result from memory 'count' - if you need to count 'saveAs ...' - if you need to persist transformed RDD Regards, Kevin On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] < ml-node+s1001560n21094...@n3.nabble.com> wrote: > Hi all > Is there efficient way to trigger RDD transformations? I'm now using count > action to achieve this. > > Best regards > Kevin > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=a2V2aW5raW1AYXBhY2hlLm9yZ3wxfC0xNDUyMjU3MDUw> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Manually trigger RDD map function without action
Hi all Is there efficient way to trigger RDD transformations? I'm now using count action to achieve this. Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map function
Thanks, Paolo and Mark. :) > On 04 Dec 2014, at 11:58, Paolo Platter wrote: > > Hi, > > rdd.flatMap( e => e._2.map( i => ( i, e._1))) > > Should work, but I didn't test it so maybe I'm missing something. > > Paolo > > Inviata dal mio Windows Phone > Da: Yifan LI <mailto:iamyifa...@gmail.com> > Inviato: 04/12/2014 09:27 > A: user@spark.apache.org <mailto:user@spark.apache.org> > Oggetto: map function > > Hi, > > I have a RDD like below: > (1, (10, 20)) > (2, (30, 40, 10)) > (3, (30)) > … > > Is there any way to map it to this: > (10,1) > (20,1) > (30,2) > (40,2) > (10,2) > (30,3) > … > > generally, for each element, it might be mapped to multiple. > > Thanks in advance! > > > Best, > Yifan LI > > > > >
R: map function
Hi, rdd.flatMap( e => e._2.map( i => ( i, e._1))) Should work, but I didn't test it so maybe I'm missing something. Paolo Inviata dal mio Windows Phone Da: Yifan LI<mailto:iamyifa...@gmail.com> Inviato: 04/12/2014 09:27 A: user@spark.apache.org<mailto:user@spark.apache.org> Oggetto: map function Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: map function
rdd.flatMap { case (k, coll) => coll.map { elem => (elem, k) } } On Thu, Dec 4, 2014 at 1:26 AM, Yifan LI wrote: > Hi, > > I have a RDD like below: > (1, (10, 20)) > (2, (30, 40, 10)) > (3, (30)) > … > > Is there any way to map it to this: > (10,1) > (20,1) > (30,2) > (40,2) > (10,2) > (30,3) > … > > generally, for each element, it might be mapped to multiple. > > Thanks in advance! > > > Best, > Yifan LI > > > > > >
map function
Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: Keep state inside map function
doing cleanup in an iterator like that assumes the iterator always gets fully read, which is not necessary the case (for example RDD.take does not). instead i would use mapPartitionsWithContext, in which case you can write a function of the form. f: (TaskContext, Iterator[T]) => Iterator[U] now you can register a cleanup with the task context, like this: context.addTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = dosomething ) and after that proceed with an iterator transformation as usual On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen wrote: > On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer wrote: > > rdd.mapPartitions { partition => > >// Some setup code here > >val result = partition.map(yourfunction) > > > >// Some cleanup code here > >result > > } > > Yes, I realized that after I hit send. You definitely have to store > and return the result from the mapping! > > > > rdd.mapPartitions { partition => > >if (!partition.isEmpty) { > > > > // Some setup code here > > partition.map(item => { > >val output = yourfunction(item) > >if (!partition.hasNext) { > > // Some cleanup code here > >} > >output > > }) > >} else { > > // return an empty Iterator of your return type > >} > > } > > Great point, yeah. If you knew the number of values were small you > could collect them and process locally, but this is the right general > way to do it. >
Re: How to use multi thread in RDD map function ?
If increasing executors really isn't enough, then you can consider using mapPartitions to process whole partitions at a time. Within that you can multi thread your processing of the elements in the partition. (And you should probably use more like one worker per machine then.) The question is how to parallelize. If you can tolerate the input and output being in memory, then you can make the Iterator into a parallel collection and trivially map it in parallel locally. Otherwise you can look at Future.traverse to iterate it in parallel instead but I have not tried it. On Sep 28, 2014 4:44 AM, "myasuka" wrote: > Hi, everyone > I come across with a problem about increasing the concurency. In a > program, after shuffle write, each node should fetch 16 pair matrices to do > matrix multiplication. such as: > > * > import breeze.linalg.{DenseMatrix => BDM} > > pairs.map(t => { > val b1 = t._2._1.asInstanceOf[BDM[Double]] > val b2 = t._2._2.asInstanceOf[BDM[Double]] > > val c = (b1 * b2).asInstanceOf[BDM[Double]] > > (new BlockID(t._1.row, t._1.column), c) > })* > > Each node has 16 cores. However, no matter I set 16 tasks or more on > each node, the concurrency cannot be higher than 60%, which means not every > core on the node is computing. Then I check the running log on the WebUI, > according to the amount of shuffle read and write in every task, I see some > task do once matrix multiplication, some do twice while some do none. > > Thus, I think of using java multi thread to increase the concurrency. I > wrote a program in scala which calls java multi thread without Spark on a > single node, by watch the 'top' monitor, I find this program can use CPU up > to 1500% ( means nearly every core are computing). But I have no idea how > to > use Java multi thread in RDD transformation. > > Is there any one can provide some example code to use Java multi thread > in RDD transformation, or give any idea to increase the concurrency ? > > Thanks for all > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: How to use multi thread in RDD map function ?
Thank you for your reply, Actually, we have already used this parameter. Our cluster is a standalone cluster with 16 nodes, every node has 16 cores. We have 256 pairs matrices along with 256 tasks , when we set --total-executor-cores as 64, each node can launch 4 tasks simultaneously, each task can do once matrix multiplication, CPU usage is nearly 25%. If we set --total-executor-cores as 128, each node can launch 8 tasks simultaneously, but not every task do once matrix multiplication, CPU usage is nearly 35%. Then if we set --total-executor-cores as 256, each node can launch 16 tasks simultaneously, but still not every task do once matrix multiplication, some do none some do twice, CPU usage is nearly 50%. If we can increase the concurency to increase the CPU usage, thus less running time we will cost. Hope for any solution! Qin Wei wrote > in the options of spark-submit, there are two options which may be helpful > to your problem, they are "--total-executor-cores NUM"(standalone and > mesos only), "--executor-cores"(yarn only) > > > qinwei > From: myasukaDate: 2014-09-28 11:44To: userSubject: How to use multi > thread in RDD map function ?Hi, everyone > I come across with a problem about increasing the concurency. In a > program, after shuffle write, each node should fetch 16 pair matrices to > do > matrix multiplication. such as: > > * > import breeze.linalg.{DenseMatrix => BDM} > > pairs.map(t => { > val b1 = t._2._1.asInstanceOf[BDM[Double]] > val b2 = t._2._2.asInstanceOf[BDM[Double]] > > val c = (b1 * b2).asInstanceOf[BDM[Double]] > > (new BlockID(t._1.row, t._1.column), c) > })* > > Each node has 16 cores. However, no matter I set 16 tasks or more on > each node, the concurrency cannot be higher than 60%, which means not > every > core on the node is computing. Then I check the running log on the WebUI, > according to the amount of shuffle read and write in every task, I see > some > task do once matrix multiplication, some do twice while some do none. > > Thus, I think of using java multi thread to increase the concurrency. > I > wrote a program in scala which calls java multi thread without Spark on a > single node, by watch the 'top' monitor, I find this program can use CPU > up > to 1500% ( means nearly every core are computing). But I have no idea how > to > use Java multi thread in RDD transformation. > > Is there any one can provide some example code to use Java multi > thread > in RDD transformation, or give any idea to increase the concurrency ? > > Thanks for all > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: > user-unsubscribe@.apache > For additional commands, e-mail: > user-help@.apache > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286p15290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use multi thread in RDD map function ?
in the options of spark-submit, there are two options which may be helpful to your problem, they are "--total-executor-cores NUM"(standalone and mesos only), "--executor-cores"(yarn only) qinwei From: myasukaDate: 2014-09-28 11:44To: userSubject: How to use multi thread in RDD map function ?Hi, everyone I come across with a problem about increasing the concurency. In a program, after shuffle write, each node should fetch 16 pair matrices to do matrix multiplication. such as: * import breeze.linalg.{DenseMatrix => BDM} pairs.map(t => { val b1 = t._2._1.asInstanceOf[BDM[Double]] val b2 = t._2._2.asInstanceOf[BDM[Double]] val c = (b1 * b2).asInstanceOf[BDM[Double]] (new BlockID(t._1.row, t._1.column), c) })* Each node has 16 cores. However, no matter I set 16 tasks or more on each node, the concurrency cannot be higher than 60%, which means not every core on the node is computing. Then I check the running log on the WebUI, according to the amount of shuffle read and write in every task, I see some task do once matrix multiplication, some do twice while some do none. Thus, I think of using java multi thread to increase the concurrency. I wrote a program in scala which calls java multi thread without Spark on a single node, by watch the 'top' monitor, I find this program can use CPU up to 1500% ( means nearly every core are computing). But I have no idea how to use Java multi thread in RDD transformation. Is there any one can provide some example code to use Java multi thread in RDD transformation, or give any idea to increase the concurrency ? Thanks for all -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use multi thread in RDD map function ?
Hi, everyone I come across with a problem about increasing the concurency. In a program, after shuffle write, each node should fetch 16 pair matrices to do matrix multiplication. such as: * import breeze.linalg.{DenseMatrix => BDM} pairs.map(t => { val b1 = t._2._1.asInstanceOf[BDM[Double]] val b2 = t._2._2.asInstanceOf[BDM[Double]] val c = (b1 * b2).asInstanceOf[BDM[Double]] (new BlockID(t._1.row, t._1.column), c) })* Each node has 16 cores. However, no matter I set 16 tasks or more on each node, the concurrency cannot be higher than 60%, which means not every core on the node is computing. Then I check the running log on the WebUI, according to the amount of shuffle read and write in every task, I see some task do once matrix multiplication, some do twice while some do none. Thus, I think of using java multi thread to increase the concurrency. I wrote a program in scala which calls java multi thread without Spark on a single node, by watch the 'top' monitor, I find this program can use CPU up to 1500% ( means nearly every core are computing). But I have no idea how to use Java multi thread in RDD transformation. Is there any one can provide some example code to use Java multi thread in RDD transformation, or give any idea to increase the concurrency ? Thanks for all -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Access file name in map function
If the size of each file is small, you may try |SparkContext.wholeTextFiles|. Otherwise you can try something like this: |val filenames: Seq[String] = ... val combined: RDD[(String,String)] = filenames.map { name => sc.textFile(name).map(line => name -> line) }.reduce(_ ++ _) | On 9/26/14 6:45 PM, Shekhar Bansal wrote: Hi In one of our usecase, filename contains timestamp and we have to append it in the record for aggregation. How can I access filename in map function? Thanks!
Access file name in map function
Hi In one of our usecase, filename contains timestamp and we have to append it in the record for aggregation. How can I access filename in map function? Thanks!
Re: Reference External Variables in Map Function (Inner class)
That should work. Gonna give it a try. Thanks ! On Tue, Aug 12, 2014 at 11:01 AM, Marcelo Vanzin wrote: > You could create a copy of the variable inside your "Parse" class; > that way it would be serialized with the instance you create when > calling map() below. > > On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri > wrote: > > Are there any other workarounds that could be used to pass in the values > > from someVariable to the transformation function ? > > > > > > On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen wrote: > >> > >> I don't think static members are going to be serialized in the > >> closure? the instance of Parse will be looking at its local > >> SampleOuterClass, which is maybe not initialized on the remote JVM. > >> > >> On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri > >> wrote: > >> > I have a class defining an inner static class (map function). The > inner > >> > class tries to refer the variable instantiated in the outside class, > >> > which > >> > results in a NullPointerException. Sample Code as follows: > >> > > >> > class SampleOuterClass { > >> > > >> > private static ArrayList someVariable; > >> > > >> > SampleOuterClass() { > >> >// initialize someVariable > >> > } > >> > > >> > public static class Parse implements Function<...> { > >> > public TypeReturn call (...) { > >> > // Try using someVariable: Raises > NullPointerException > >> > } > >> > } > >> > > >> > public void run() { > >> > RDD<> rdd = data.map(new Parse()).rdd() > >> > } > >> > } > >> > > >> > Am I missing something with how Closures work with Spark or something > >> > else > >> > is wrong ? > >> > > >> > Thanks > >> > Sunny > >> > > >> > > > > > > > > > -- > Marcelo >
Re: Reference External Variables in Map Function (Inner class)
You could create a copy of the variable inside your "Parse" class; that way it would be serialized with the instance you create when calling map() below. On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri wrote: > Are there any other workarounds that could be used to pass in the values > from someVariable to the transformation function ? > > > On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen wrote: >> >> I don't think static members are going to be serialized in the >> closure? the instance of Parse will be looking at its local >> SampleOuterClass, which is maybe not initialized on the remote JVM. >> >> On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri >> wrote: >> > I have a class defining an inner static class (map function). The inner >> > class tries to refer the variable instantiated in the outside class, >> > which >> > results in a NullPointerException. Sample Code as follows: >> > >> > class SampleOuterClass { >> > >> > private static ArrayList someVariable; >> > >> > SampleOuterClass() { >> >// initialize someVariable >> > } >> > >> > public static class Parse implements Function<...> { >> > public TypeReturn call (...) { >> > // Try using someVariable: Raises NullPointerException >> > } >> > } >> > >> > public void run() { >> > RDD<> rdd = data.map(new Parse()).rdd() >> > } >> > } >> > >> > Am I missing something with how Closures work with Spark or something >> > else >> > is wrong ? >> > >> > Thanks >> > Sunny >> > >> > > > -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reference External Variables in Map Function (Inner class)
Are there any other workarounds that could be used to pass in the values from *someVariable *to the transformation function ? On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen wrote: > I don't think static members are going to be serialized in the > closure? the instance of Parse will be looking at its local > SampleOuterClass, which is maybe not initialized on the remote JVM. > > On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri > wrote: > > I have a class defining an inner static class (map function). The inner > > class tries to refer the variable instantiated in the outside class, > which > > results in a NullPointerException. Sample Code as follows: > > > > class SampleOuterClass { > > > > private static ArrayList someVariable; > > > > SampleOuterClass() { > >// initialize someVariable > > } > > > > public static class Parse implements Function<...> { > > public TypeReturn call (...) { > > // Try using someVariable: Raises NullPointerException > > } > > } > > > > public void run() { > > RDD<> rdd = data.map(new Parse()).rdd() > > } > > } > > > > Am I missing something with how Closures work with Spark or something > else > > is wrong ? > > > > Thanks > > Sunny > > > > >
Re: Reference External Variables in Map Function (Inner class)
I don't think static members are going to be serialized in the closure? the instance of Parse will be looking at its local SampleOuterClass, which is maybe not initialized on the remote JVM. On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri wrote: > I have a class defining an inner static class (map function). The inner > class tries to refer the variable instantiated in the outside class, which > results in a NullPointerException. Sample Code as follows: > > class SampleOuterClass { > > private static ArrayList someVariable; > > SampleOuterClass() { >// initialize someVariable > } > > public static class Parse implements Function<...> { > public TypeReturn call (...) { > // Try using someVariable: Raises NullPointerException > } > } > > public void run() { > RDD<> rdd = data.map(new Parse()).rdd() > } > } > > Am I missing something with how Closures work with Spark or something else > is wrong ? > > Thanks > Sunny > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Reference External Variables in Map Function (Inner class)
I have a class defining an inner static class (map function). The inner class tries to refer the variable instantiated in the outside class, which results in a NullPointerException. Sample Code as follows: class SampleOuterClass { private static ArrayList someVariable; SampleOuterClass() { // initialize someVariable } public static class Parse implements Function<...> { public TypeReturn call (...) { // Try using someVariable: *Raises NullPointerException* } } public void run() { RDD<> rdd = data.map(new Parse()).rdd() } } Am I missing something with how Closures work with Spark or something else is wrong ? Thanks Sunny
Re: Keep state inside map function
On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer wrote: > rdd.mapPartitions { partition => >// Some setup code here >val result = partition.map(yourfunction) > >// Some cleanup code here >result > } Yes, I realized that after I hit send. You definitely have to store and return the result from the mapping! > rdd.mapPartitions { partition => >if (!partition.isEmpty) { > > // Some setup code here > partition.map(item => { >val output = yourfunction(item) >if (!partition.hasNext) { > // Some cleanup code here >} >output > }) >} else { > // return an empty Iterator of your return type >} > } Great point, yeah. If you knew the number of values were small you could collect them and process locally, but this is the right general way to do it.
Re: Keep state inside map function
Hi, On Thu, Jul 31, 2014 at 2:23 AM, Sean Owen wrote: > > ... you can run setup code before mapping a bunch of records, and > after, like so: > > rdd.mapPartitions { partition => >// Some setup code here >partition.map(yourfunction) >// Some cleanup code here > } > Please be careful with that, it will not work as expected. First, it would have to be rdd.mapPartitions { partition => // Some setup code here val result = partition.map(yourfunction) // Some cleanup code here result } because the function passed in to mapPartitions() needs to return an Iterator, and if you do it like this, then the cleanup code will run *before* the processing takes place because partition.map() is executed lazily. One example of what actually works is: rdd.mapPartitions { partition => if (!partition.isEmpty) { // Some setup code here partition.map(item => { val output = yourfunction(item) if (!partition.hasNext) { // Some cleanup code here } output }) } else { // return an empty Iterator of your return type } } That is not very pretty, but it is the only way I found to actually get tearDown code run after map() is run. Tobias
Re: Keep state inside map function
Thanks to the both of you for your inputs. Looks like I'll play with the mapPartitions function to start porting MapReduce algorithms to Spark. On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen wrote: > Really, the analog of a Mapper is not map(), but mapPartitions(). Instead > of: > > rdd.map(yourFunction) > > ... you can run setup code before mapping a bunch of records, and > after, like so: > > rdd.mapPartitions { partition => >// Some setup code here >partition.map(yourfunction) >// Some cleanup code here > } > > You couldn't share state across Mappers, or Mappers and Reducers in > Hadoop. (At least there was no direct way.) Same here. But you can > maintain state across many map calls. > > On Wed, Jul 30, 2014 at 6:07 PM, Kevin wrote: > > Hi, > > > > Is it possible to maintain state inside a Spark map function? With Hadoop > > MapReduce, Mappers and Reducers are classes that can have their own state > > using instance variables. Can this be done with Spark? Are there any > > examples? > > > > Most examples I have seen do a simple operating on the value passed into > the > > map function and then pass it along to the reduce function. > > > > Thanks in advance. > > > > -Kevin >
Re: Keep state inside map function
Really, the analog of a Mapper is not map(), but mapPartitions(). Instead of: rdd.map(yourFunction) ... you can run setup code before mapping a bunch of records, and after, like so: rdd.mapPartitions { partition => // Some setup code here partition.map(yourfunction) // Some cleanup code here } You couldn't share state across Mappers, or Mappers and Reducers in Hadoop. (At least there was no direct way.) Same here. But you can maintain state across many map calls. On Wed, Jul 30, 2014 at 6:07 PM, Kevin wrote: > Hi, > > Is it possible to maintain state inside a Spark map function? With Hadoop > MapReduce, Mappers and Reducers are classes that can have their own state > using instance variables. Can this be done with Spark? Are there any > examples? > > Most examples I have seen do a simple operating on the value passed into the > map function and then pass it along to the reduce function. > > Thanks in advance. > > -Kevin
Re: Keep state inside map function
use mapPartitions to get the equivalent functionality to hadoop -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p10969.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Keep state inside map function
Hi, Is it possible to maintain state inside a Spark map function? With Hadoop MapReduce, Mappers and Reducers are classes that can have their own state using instance variables. Can this be done with Spark? Are there any examples? Most examples I have seen do a simple operating on the value passed into the map function and then pass it along to the reduce function. Thanks in advance. -Kevin
Re: Map Function does not seem to be executing over RDD
Does this line println("Retuning "+string) from the hash function print what you expect? If you're not seeing that output in the executor log I'd also put some debug statements in "case other", since your match in the "interesting" case is conditioned on if( fieldsList.contains(index)) -- maybe that doesn't catch what you think it should...if that's the case you can dump out the contents of fieldsList within the "other" case (i.e. inside the map) and see what's there... On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman wrote: > Hello every one > > I am having some problem with a simple Scala/ Spark Code in which I am > trying to replaces certain fields in a csv with their hashes > > class DSV (var line:String="",fieldsList:Seq[Int], var delimiter:String=",") > extends Serializable { > > def hash(s:String):String={ > var md = MessageDigest.getInstance("sha") > md.update(s.getBytes("UTF-8")) > > var digest = md.digest() > > val string:Option[String] = Option(digest).map(Hex.valueOf) > > println("Retuning "+string) > string.getOrElse("") > } > > def anonymizeFields(l:String):String ={ > l.split(delimiter,-1).zipWithIndex > .map { > case (str, index) if( fieldsList.contains(index)) > =>hash(str) > case other => other._1 > }.mkString(delimiter) > } > } > > I am calling the anonymize function like this but the anondata seems to be > the same as the original dsvData > > var dsvData = sc.textFile(inputPath+inputFileName).map( > line=>(new DSV(line,List(1,2), "\\|")) > ) > > println("Lines Processed="+dsvData.count()) > var anonData = dsvData.map(l=>l.anonymizeFields(l.line)) > > println("DSVs Processed="+anonData.count()) > anonData.saveAsTextFile(outputPath+outputFileName) > > I have tried the execution through shell as well but the problem persists. > The job does finish but the worker log shows the following error message > > 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError > [akka.tcp://sparkWorker@host:60593] -> > [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with > [akka.tcp://sparkExecutor@host:51397]] [ > > Regards > MRK
Map Function does not seem to be executing over RDD
Hello every one I am having some problem with a simple Scala/ Spark Code in which I am trying to replaces certain fields in a csv with their hashes class DSV (var line:String="",fieldsList:Seq[Int], var delimiter:String=",") extends Serializable { def hash(s:String):String={ var md = MessageDigest.getInstance("sha") md.update(s.getBytes("UTF-8")) var digest = md.digest() val string:Option[String] = Option(digest).map(Hex.valueOf) println("Retuning "+string) string.getOrElse("") } def anonymizeFields(l:String):String ={ l.split(delimiter,-1).zipWithIndex .map { case (str, index) if( fieldsList.contains(index)) =>hash(str) case other => other._1 }.mkString(delimiter) } } I am calling the anonymize function like this but the anondata seems to be the same as the original dsvData var dsvData = sc.textFile(inputPath+inputFileName).map( line=>(new DSV(line,List(1,2), "\\|")) ) println("Lines Processed="+dsvData.count()) var anonData = dsvData.map(l=>l.anonymizeFields(l.line)) println("DSVs Processed="+anonData.count()) anonData.saveAsTextFile(outputPath+outputFileName) I have tried the execution through shell as well but the problem persists. The job does finish but the worker log shows the following error message 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@host:60593] -> [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [ Regards MRK
Access original filename in a map function
Hi spark-folk, I have a directory full of files that I want to process using PySpark. There is some necessary metadata in the filename that I would love to attach to each record in that file. Using Java MapReduce, I would access (FileSplit) context.getInputSplit()).getPath().getName() in the setup() method of the mapper. Using Hadoop Streaming, I can access the environment variable map_input_fileto get the filename. Is there something I can do in PySpark to get the filename? Surely, one solution would be to get the list of files first, load each one as an RDD separately, and then union them together. But listing the files in HDFS is a bit annoying through Python, so I was wondering if the filename is somehow attached to a partition. Thanks! Uri -- Uri Laserson, PhD Data Scientist, Cloudera Twitter/GitHub: @laserson +1 617 910 0447 laser...@cloudera.com
Create a new object in pyspark map function
Hi all, I try to create new object in the map function. But pyspark report a lot of error information. Is it legal to do so? Here is my codes: class Node(object): def __init__(self, A, B, C): self.A = A self.B = B self.C = C def make_vertex(pair): A, (B, C) = pair return Node(A, B, C) dictionary = {'PYTHONPATH':'/home/grad02/xss/opt/old'} sc = SparkContext("spark://zzz:7077", "test job", environment = dictionary ) rdd = sc.parallelize([[1,(2, 3) ]]) noMap = make_vertex([1, (2, 3)]) print(noMap.A) myRdd = rdd.map(make_vertex) result = myRdd.collect() Could anybody tell me whether create a new object in a map function in pyspark is legal? Thanks, Kaden