Instead of print you should do jsonIn.count().print(). Straight forward approach is to use foreachRDD :)
Thanks Best Regards On Mon, Feb 16, 2015 at 6:48 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > Hello Sean, > > I did not understand your question very well, but what I do is checking > the output directory (and I have various logger outputs at various stages > showing the contents of an input file being processed, the response from > the web service, etc.). > > By the way, I've already solved my problem by using foreachRDD instead of > print (see my second message in this thread). Apparently forcing Spark to > materialize DAG via print() is not the way to go. (My interpretation might > be wrong, but this is what I've just seen in my case). > > -- > Emre > > > > > On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen <so...@cloudera.com> wrote: > >> How are you deciding whether files are processed or not? It doesn't seem >> possible from this code. Maybe it just seems so. >> On Feb 16, 2015 12:51 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote: >> >>> I've managed to solve this, but I still don't know exactly why my >>> solution works: >>> >>> In my code I was trying to force the Spark to output via: >>> >>> jsonIn.print(); >>> >>> jsonIn being a JavaDStream<String>. >>> >>> When removed the code above, and added the code below to force the >>> output operation, hence the execution: >>> >>> jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() { >>> @Override >>> public Void call(JavaRDD<String> stringJavaRDD) throws Exception { >>> stringJavaRDD.collect(); >>> return null; >>> } >>> }); >>> >>> It works as I expect, processing all of the 20 files I give to it, >>> instead of stopping at 16. >>> >>> -- >>> Emre >>> >>> >>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I have an application in Java that uses Spark Streaming 1.2.1 in the >>>> following manner: >>>> >>>> - Listen to the input directory. >>>> - If a new file is copied to that input directory process it. >>>> - Process: contact a RESTful web service (running also locally and >>>> responsive), send the contents of the file, receive the response from the >>>> web service, write the results as a new file into the output directory >>>> - batch interval : 30 seconds >>>> - checkpoint interval: 150 seconds >>>> >>>> When I test the application locally with 1 or 2 files, it works >>>> perfectly fine as expected. I run it like: >>>> >>>> spark-submit --class myClass --verbose --master local[4] >>>> --deploy-mode client myApp.jar /in file:///out >>>> >>>> But then I've realized something strange when I copied 20 files to the >>>> INPUT directory: Spark Streaming detects all of the files, but it ends up >>>> processing *only 16 files*. And the remaining 4 are not processed at all. >>>> >>>> I've tried it with 19, 18, and then 17 files. Same result, only 16 >>>> files end up in the output directory. >>>> >>>> Then I've tried it by copying 16 files at once to the input directory, >>>> and it can process all of the 16 files. That's why I call it magic number >>>> 16. >>>> >>>> When I mean it detects all of the files, I mean that in the logs I see >>>> the following lines when I copy 17 files: >>>> >>>> >>>> =============================================================================================================================== >>>> 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: >>>> "1G" >>>> 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu >>>> resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on >>>> interface eth0) >>>> 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to >>>> bind to another address >>>> 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started >>>> 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load >>>> native-hadoop library for your platform... using builtin-java classes where >>>> applicable >>>> 2015-02-16 12:30:53 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from >>>> file:/tmp/receivedBlockMetadata >>>> 2015-02-16 12:30:53 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Reading from the logs: >>>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 >>>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 >>>> ------------------------------------------- >>>> Time: 1424086260000 ms >>>> ------------------------------------------- >>>> >>>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085960000: >>>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Cleared log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085960000 >>>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085960000: >>>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Cleared log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085960000 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >>>> process : 1 >>>> 2015-02-16 12:31:31 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085990000: >>>> 2015-02-16 12:31:31 INFO WriteAheadLogManager for >>>> ReceivedBlockHandlerMaster:59 - Cleared log files in >>>> file:/tmp/receivedBlockMetadata older than 1424085990000 >>>> >>>> ------------------------------------------- >>>> >>>> Time: 1424086290000 ms >>>> ------------------------------------------- >>>> >>>> =============================================================================================================================== >>>> >>>> In other words it says "Total input paths to process :1" for 17 times. >>>> And when I copy 20 files, it says that 20 times. >>>> >>>> But it always ends up processing 16 files at once and the remaining >>>> ones are not processed at all. >>>> >>>> However, if I first copy 16 files to the input directory, wait for >>>> Spark Streaming application to process them (by checking the output >>>> directory and seeing that 16 files have been created properly), and then >>>> copy the 4 more files, those 4 files are also processed! >>>> >>>> So now I'm in a weird situation that I have to copy 16 files at maximum >>>> at once, wait them to be processed, and only after that copy again 16 files >>>> at max, ... otherwise I lose the extra files, in the sense that they are >>>> not processed. This is not acceptable in my use-case. >>>> >>>> I've also checked the parameter >>>> >>>> spark.streaming.receiver.maxRate >>>> >>>> >>>> and it is INFINITE by default, I've tried setting it to 10 for example, >>>> and nothing has changed. >>>> >>>> Any ideas what might be causing this situation, having a magic number >>>> of 16 files at once? >>>> >>>> >>>> -- >>>> Emre Sevinç >>>> >>> >>> >>> >>> -- >>> Emre Sevinc >>> >> > > > -- > Emre Sevinc >