I've managed to solve this, but I still don't know exactly why my solution works:
In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStream<String>. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > Hello, > > I have an application in Java that uses Spark Streaming 1.2.1 in the > following manner: > > - Listen to the input directory. > - If a new file is copied to that input directory process it. > - Process: contact a RESTful web service (running also locally and > responsive), send the contents of the file, receive the response from the > web service, write the results as a new file into the output directory > - batch interval : 30 seconds > - checkpoint interval: 150 seconds > > When I test the application locally with 1 or 2 files, it works perfectly > fine as expected. I run it like: > > spark-submit --class myClass --verbose --master local[4] > --deploy-mode client myApp.jar /in file:///out > > But then I've realized something strange when I copied 20 files to the > INPUT directory: Spark Streaming detects all of the files, but it ends up > processing *only 16 files*. And the remaining 4 are not processed at all. > > I've tried it with 19, 18, and then 17 files. Same result, only 16 files > end up in the output directory. > > Then I've tried it by copying 16 files at once to the input directory, and > it can process all of the 16 files. That's why I call it magic number 16. > > When I mean it detects all of the files, I mean that in the logs I see the > following lines when I copy 17 files: > > > =============================================================================================================================== > 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: "1G" > 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves > to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface > eth0) > 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to > bind to another address > 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started > 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load > native-hadoop library for your platform... using builtin-java classes where > applicable > 2015-02-16 12:30:53 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from > file:/tmp/receivedBlockMetadata > 2015-02-16 12:30:53 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Reading from the logs: > file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 > file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 > ------------------------------------------- > Time: 1424086260000 ms > ------------------------------------------- > > 2015-02-16 12:31:00 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in > file:/tmp/receivedBlockMetadata older than 1424085960000: > 2015-02-16 12:31:00 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Cleared log files in > file:/tmp/receivedBlockMetadata older than 1424085960000 > 2015-02-16 12:31:00 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in > file:/tmp/receivedBlockMetadata older than 1424085960000: > 2015-02-16 12:31:00 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Cleared log files in > file:/tmp/receivedBlockMetadata older than 1424085960000 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-02-16 12:31:31 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in > file:/tmp/receivedBlockMetadata older than 1424085990000: > 2015-02-16 12:31:31 INFO WriteAheadLogManager for > ReceivedBlockHandlerMaster:59 - Cleared log files in > file:/tmp/receivedBlockMetadata older than 1424085990000 > > ------------------------------------------- > > Time: 1424086290000 ms > ------------------------------------------- > > =============================================================================================================================== > > In other words it says "Total input paths to process :1" for 17 times. And > when I copy 20 files, it says that 20 times. > > But it always ends up processing 16 files at once and the remaining ones > are not processed at all. > > However, if I first copy 16 files to the input directory, wait for Spark > Streaming application to process them (by checking the output directory and > seeing that 16 files have been created properly), and then copy the 4 more > files, those 4 files are also processed! > > So now I'm in a weird situation that I have to copy 16 files at maximum at > once, wait them to be processed, and only after that copy again 16 files at > max, ... otherwise I lose the extra files, in the sense that they are not > processed. This is not acceptable in my use-case. > > I've also checked the parameter > > spark.streaming.receiver.maxRate > > > and it is INFINITE by default, I've tried setting it to 10 for example, > and nothing has changed. > > Any ideas what might be causing this situation, having a magic number of > 16 files at once? > > > -- > Emre Sevinç > -- Emre Sevinc