Hello Spark Group: I'm in trouble when using Spark & Yarn on *Windows*. Here is the brief summary of my conclusion:
- Spark Streaming application will be dead once it trys to read data from WriteAhead on Windows. - When the driver recovers from failure, it will read data from WriteAheadLog - When the block not exists in BlockManager, it will read data from WriteAheadLog. - Exception occurs at Hadoop process. Here is the analysis. Looking forward to your reply 4.1 What Happened? I made a Application using Spark Streaming. When I tested the Fault Tolerant Feature using WriteAheadLog mechanism of Spark onWindows(yeah, windows), I found out that the application could not recovery from failure. What makes it strange is that, the application works well until I killed the driver manually, and the driver restarted and failed. I have read the source code, and the reason is as follows: 4.2 Environment - Windows server 2012 - Spark: 1.6.1 - Hadoop: 2.6.3 4.3 Full Debug Log 16/08/03 12:26:14 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 4.0 (TID 85) on executor cnazdev04.fareast.corp.microsoft.com: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://CNAZDEV02:19000/checkpoint/receivedData/0/log-1470197944406-1470198004406,189,59)) [duplicate 3]16/08/03 12:26:14 ERROR scheduler.TaskSetManager: Task 0 in stage 4.0 failed 4 times; aborting job16/08/03 12:26:14 INFO cluster.YarnClusterScheduler: Cancelling stage 416/08/03 12:26:14 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled16/08/03 12:26:14 INFO scheduler.DAGScheduler: ResultStage 4 (count at NativeMethodAccessorImpl.java:-2) failed in 5.624 s16/08/03 12:26:14 INFO scheduler.DAGScheduler: Job 3 failed: count at NativeMethodAccessorImpl.java:-2, took 5.691028 s java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:156) at org.apache.spark.api.csharp.CSharpBackendHandler.handleBackendRequest(CSharpBackendHandler.scala:103) at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:30) at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:27) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 85, cnazdev04.fareast.corp.microsoft.com): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://CNAZDEV02:19000/checkpoint/receivedData/0/log-1470197944406-1470198004406,189,59) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:143) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:167) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:167) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:167) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.api.csharp.CSharpRDD.compute(CSharpRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Pathname /C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed from C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:227) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForReceiver(WriteAheadLogUtils.scala:110) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:138) 4.4 Key Point Caused by: java.lang.IllegalArgumentException: Pathname /C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed from C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed is not a valid DFS filename. 4.5 Reason & Analysis As you can see from the debug log and key point, the error occurs at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(), the error occurs at Hadoop process. Here is the source code: private String getPathName(Path file) { checkPath(file); String result = file.toUri().getPath(); if (!DFSUtil.isValidName(result)) { throw new IllegalArgumentException("Pathname " + result + " from " + file+" is not a valid DFS filename."); } return result; } The Parameter Path file, in this case is C:/tmp/hadoop. In this function, it first checkPath, nothing to say. Then another temperary viriable result is generated by file.toUri().getPath(), in this case, the result is /C:/tmp/hadoop. For now, everything is in peace. Then it will validate the result viriable by DFSUtil. Here is the source code of DFSUtil.isValidName /** * Whether the pathname is valid. Currently prohibits relative paths, * names which contain a ":" or "//", or other non-canonical paths. */ public static boolean isValidName(String src) { // Path must be absolute. if (!src.startsWith(Path.SEPARATOR)) { return false; } // Check for ".." "." ":" "/" String[] components = StringUtils.split(src, '/'); for (int i = 0; i < components.length; i++) { String element = components[i]; if (element.equals(".") || (element.indexOf(":") >= 0) || (element.indexOf("/") >= 0)) { return false; } ... >From the comments, we can tell that the src is invalid as long as the string contain :. However, /C:/tmp/hadoop is a legal path. Apparently, This code does not consider paths on Windows. Sincerely, Zixuan Zhang Institute of Software, Chinese Academy of Sciences