> and then calling getRowID() in the lambda, because the function gets sent to the executor right?
Yes, that is correct (vs. a one time evaluation, as was with your assignment earlier). On Thu, Dec 10, 2015 at 3:34 AM Pinela <pin...@gmail.com> wrote: > Hey Bryan, > > Thank for the answer ;) I knew it was a basic python/spark-noob thing :) > > this also worked > > *def getRowID():* > * return datetime.now().strftime("%Y%m%d%H%M%S")* > > > and then calling getRowID() in the lambda, because the function gets sent > to the executor right? > > Thanks again for the quick reply :) > > All the best and Happy Holidays. > Jpinela. > > > > On Wed, Dec 9, 2015 at 8:22 PM, Bryan Cutler <cutl...@gmail.com> wrote: > >> rowid from your code is a variable in the driver, so it will be evaluated >> once and then only the value is sent to words.map. You probably want to >> have rowid be a lambda itself, so that it will get the value at the time it >> is evaluated. For example if I have the following: >> >> >>> data = sc.parallelize([1,2,3]) >> >>> from datetime import datetime >> >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S") >> >>> data.map(lambda x: (rowid(), x)) >> >>> mdata = data.map(lambda x: (rowid(), x)) >> >>> mdata.collect() >> [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)] >> >>> mdata.collect() >> [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)] >> >> here rowid is evaluated whenever an action is called on the RDD, i.e. >> collect >> >> On Wed, Dec 9, 2015 at 10:23 AM, jpinela <pin...@gmail.com> wrote: >> >>> Hi Guys, >>> I am sure this is a simple question, but I can't find it in the docs >>> anywhere. >>> This reads from flume and writes to hbase (as you can see). >>> But has a variable scope problem (I believe). >>> I have the following code: >>> >>> * >>> from pyspark.streaming import StreamingContext >>> from pyspark.streaming.flume import FlumeUtils >>> from datetime import datetime >>> ssc = StreamingContext(sc, 5) >>> conf = {"hbase.zookeeper.quorum": "ubuntu3", >>> "hbase.mapred.outputtable": "teste2", >>> "mapreduce.outputformat.class": >>> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", >>> "mapreduce.job.output.key.class": >>> "org.apache.hadoop.hbase.io.ImmutableBytesWritable", >>> "mapreduce.job.output.value.class": >>> "org.apache.hadoop.io.Writable"} >>> >>> >>> keyConv = >>> >>> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" >>> valueConv = >>> "org.apache.spark.examples.pythonconverters.StringListToPutConverter" >>> >>> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997) >>> words = lines.map(lambda line: line[1]) >>> rowid = datetime.now().strftime("%Y%m%d%H%M%S") >>> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x])) >>> print("ok 1") >>> outrdd.pprint() >>> >>> outrdd.foreachRDD(lambda x: >>> >>> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)) >>> >>> ssc.start() >>> ssc.awaitTermination()* >>> >>> the issue is that the rowid variable is allways at the point that the >>> streaming was began. >>> How can I go around this? I tried a function, an application, nothing >>> worked. >>> Thank you. >>> jp >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >