rowid from your code is a variable in the driver, so it will be evaluated once and then only the value is sent to words.map. You probably want to have rowid be a lambda itself, so that it will get the value at the time it is evaluated. For example if I have the following:
>>> data = sc.parallelize([1,2,3]) >>> from datetime import datetime >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S") >>> data.map(lambda x: (rowid(), x)) >>> mdata = data.map(lambda x: (rowid(), x)) >>> mdata.collect() [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)] >>> mdata.collect() [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)] here rowid is evaluated whenever an action is called on the RDD, i.e. collect On Wed, Dec 9, 2015 at 10:23 AM, jpinela <pin...@gmail.com> wrote: > Hi Guys, > I am sure this is a simple question, but I can't find it in the docs > anywhere. > This reads from flume and writes to hbase (as you can see). > But has a variable scope problem (I believe). > I have the following code: > > * > from pyspark.streaming import StreamingContext > from pyspark.streaming.flume import FlumeUtils > from datetime import datetime > ssc = StreamingContext(sc, 5) > conf = {"hbase.zookeeper.quorum": "ubuntu3", > "hbase.mapred.outputtable": "teste2", > "mapreduce.outputformat.class": > "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", > "mapreduce.job.output.key.class": > "org.apache.hadoop.hbase.io.ImmutableBytesWritable", > "mapreduce.job.output.value.class": > "org.apache.hadoop.io.Writable"} > > > keyConv = > > "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" > valueConv = > "org.apache.spark.examples.pythonconverters.StringListToPutConverter" > > lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997) > words = lines.map(lambda line: line[1]) > rowid = datetime.now().strftime("%Y%m%d%H%M%S") > outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x])) > print("ok 1") > outrdd.pprint() > > outrdd.foreachRDD(lambda x: > > x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)) > > ssc.start() > ssc.awaitTermination()* > > the issue is that the rowid variable is allways at the point that the > streaming was began. > How can I go around this? I tried a function, an application, nothing > worked. > Thank you. > jp > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >