How are you deciding whether files are processed or not? It doesn't seem
possible from this code. Maybe it just seems so.
On Feb 16, 2015 12:51 PM, "Emre Sevinc" <[email protected]> wrote:
> I've managed to solve this, but I still don't know exactly why my solution
> works:
>
> In my code I was trying to force the Spark to output via:
>
> jsonIn.print();
>
> jsonIn being a JavaDStream<String>.
>
> When removed the code above, and added the code below to force the output
> operation, hence the execution:
>
> jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() {
> @Override
> public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
> stringJavaRDD.collect();
> return null;
> }
> });
>
> It works as I expect, processing all of the 20 files I give to it, instead
> of stopping at 16.
>
> --
> Emre
>
>
> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <[email protected]>
> wrote:
>
>> Hello,
>>
>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>> following manner:
>>
>> - Listen to the input directory.
>> - If a new file is copied to that input directory process it.
>> - Process: contact a RESTful web service (running also locally and
>> responsive), send the contents of the file, receive the response from the
>> web service, write the results as a new file into the output directory
>> - batch interval : 30 seconds
>> - checkpoint interval: 150 seconds
>>
>> When I test the application locally with 1 or 2 files, it works perfectly
>> fine as expected. I run it like:
>>
>> spark-submit --class myClass --verbose --master local[4]
>> --deploy-mode client myApp.jar /in file:///out
>>
>> But then I've realized something strange when I copied 20 files to the
>> INPUT directory: Spark Streaming detects all of the files, but it ends up
>> processing *only 16 files*. And the remaining 4 are not processed at all.
>>
>> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
>> end up in the output directory.
>>
>> Then I've tried it by copying 16 files at once to the input directory,
>> and it can process all of the 16 files. That's why I call it magic number
>> 16.
>>
>> When I mean it detects all of the files, I mean that in the logs I see
>> the following lines when I copy 17 files:
>>
>>
>> ===============================================================================================================================
>> 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: "1G"
>> 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves
>> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
>> eth0)
>> 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to
>> bind to another address
>> 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started
>> 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 2015-02-16 12:30:53 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>> file:/tmp/receivedBlockMetadata
>> 2015-02-16 12:30:53 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>> -------------------------------------------
>> Time: 1424086260000 ms
>> -------------------------------------------
>>
>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000
>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>> 2015-02-16 12:31:00 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085990000:
>> 2015-02-16 12:31:31 INFO WriteAheadLogManager for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085990000
>>
>> -------------------------------------------
>>
>> Time: 1424086290000 ms
>> -------------------------------------------
>>
>> ===============================================================================================================================
>>
>> In other words it says "Total input paths to process :1" for 17 times.
>> And when I copy 20 files, it says that 20 times.
>>
>> But it always ends up processing 16 files at once and the remaining ones
>> are not processed at all.
>>
>> However, if I first copy 16 files to the input directory, wait for Spark
>> Streaming application to process them (by checking the output directory and
>> seeing that 16 files have been created properly), and then copy the 4 more
>> files, those 4 files are also processed!
>>
>> So now I'm in a weird situation that I have to copy 16 files at maximum
>> at once, wait them to be processed, and only after that copy again 16 files
>> at max, ... otherwise I lose the extra files, in the sense that they are
>> not processed. This is not acceptable in my use-case.
>>
>> I've also checked the parameter
>>
>> spark.streaming.receiver.maxRate
>>
>>
>> and it is INFINITE by default, I've tried setting it to 10 for example,
>> and nothing has changed.
>>
>> Any ideas what might be causing this situation, having a magic number of
>> 16 files at once?
>>
>>
>> --
>> Emre Sevinç
>>
>
>
>
> --
> Emre Sevinc
>