Hello Subacini,
Until someone more knowledgeable suggests a better, more straightforward,
and simpler approach with a working code snippet, I suggest the following
workaround / hack:
inputStream.foreachRDD(rdd =
val myStr = rdd.toDebugString
// process myStr string value, e.g. using regular expressions
)
For example if you print myStr, you can see in your log / consol output
somehing similar to:
15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1
15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
(20) MappedRDD[27] at textFileStream at kmeans.scala:17 []
| UnionRDD[26] at textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at
textFileStream at kmeans.scala:17 []
| file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at
textFileStream at kmeans.scala:17 []
15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time
1424787295000 ms (execution: 0.051 s)
15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list
15/02/24 15:14:56 INFO BlockManager: Removing RDD 5
15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were
older than 1424787235000 ms:
15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
You can process the string to retrieve each section that starts with file:
and ends with a space. Then for each such string you can get your timestamp
from the file name.
--
Emre Sevinç
http://www.bigindustries.be/
On Fri, Feb 6, 2015 at 9:33 PM, Subacini B subac...@gmail.com wrote:
Thank you Emre, This helps, i am able to get filename.
But i am not sure how to fit this into Dstream RDD.
val inputStream = ssc.textFileStream(/hdfs Path/)
inputStream is Dstreamrdd and in foreachrdd , am doing my processing
inputStream.foreachRDD(rdd = {
* //how to get filename here??*
})
Can you please help.
On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com
wrote:
Hello,
Did you check the following?
http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/
http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html
--
Emre Sevinç
On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:
Hi All,
We have filename with timestamp say ABC_1421893256000.txt and the
timestamp needs to be extracted from file name for further processing.Is
there a way to get input file name picked up by spark streaming job?
Thanks in advance
Subacini
--
Emre Sevinc
--
Emre Sevinc