Yep. It seems like you're right. Let me dig into that and get back to you with an answer. This was definitely working after the code drop and README re-write (I tested both mvn exec:exec and running on cluster).
Thanks, Amit On Tue, Apr 12, 2016 at 1:52 PM Jean-Baptiste Onofré <[email protected]> wrote: > Hi, > > thanks for the update. > > Did you try -Doutput=~/test/beam/output (or is it a copy/paste mistake) ? > > Let me try to reproduce and check the pipeline definition. > > Thanks, > Regards > JB > > On 04/12/2016 12:41 PM, Jianfeng Qian wrote: > > Hi, > > > > I just start to use Beam. > > > > I had installed Scala 2.11 , Hadoop 2.72, Spark 1.6.1 and started hadoop > > and spark. > > > > I downloaded and build the Beam. > > > > I downloaded the filed by: > > > > /curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > > > /home/jeff/test/beam/input/kinglear.txt/ > > > > /jeff@T:~/test/beam$ ls -l input > > total 184 > > -rw-rw-r-- 1 jeff jeff 185965 Apr 11 17:16 kinglear.txt/ > > > > *then I run the local mode:* > > > > /jeff@T:~/git/incubator-beam/runners/spark$ mvn exec:exec > > -DmainClass=com.google.cloud.dataflow.examples.WordCount \ > > > -Dinput=~/test/beam/input/kinglear.txt -Doutput=/test/beam/output > > -Drunner=SparkPipelineRunner \ > > > -DsparkMaster=local/ > > *However, the result file is empty. Is anyone faced with the same > problem?* > > > > /jeff@T:~/test/beam$ ls -l > > total 4 > > drwxrwxr-x 2 jeff jeff 4096 Apr 12 17:32 input > > -rw-r--r-- 1 jeff jeff 0 Apr 12 18:20 output-00000-of-00001 > > -rw-r--r-- 1 jeff jeff 0 Apr 12 18:20 _SUCCESS/ > > > > > > local mode log is as following: > > > > [INFO] Scanning for projects... > > [WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is > > missing, no dependency information available > > [WARNING] Failed to retrieve plugin descriptor for > > org.eclipse.m2e:lifecycle-mapping:1.0.0: Plugin > > org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could > > not be resolved: Failure to find > > org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in > > https://repo.maven.apache.org/maven2 was cached in the local repository, > > resolution will not be reattempted until the update interval of central > > has elapsed or updates are forced > > [INFO] > > [INFO] > > ------------------------------------------------------------------------ > > [INFO] Building Apache Beam :: Runners :: Spark 0.1.0-incubating-SNAPSHOT > > [INFO] > > ------------------------------------------------------------------------ > > [WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is > > missing, no dependency information available > > [WARNING] Failed to retrieve plugin descriptor for > > org.eclipse.m2e:lifecycle-mapping:1.0.0: Plugin > > org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could > > not be resolved: Failure to find > > org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in > > https://repo.maven.apache.org/maven2 was cached in the local repository, > > resolution will not be reattempted until the update interval of central > > has elapsed or updates are forced > > [INFO] > > [INFO] --- exec-maven-plugin:1.4.0:exec (default-cli) @ spark-runner --- > > log4j:WARN No appenders could be found for logger > > (com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory). > > log4j:WARN Please initialize the log4j system properly. > > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for > > more info. > > Using Spark's default log4j profile: > > org/apache/spark/log4j-defaults.properties > > 16/04/12 18:20:29 INFO SparkContext: Running Spark version 1.5.2 > > 16/04/12 18:20:29 WARN NativeCodeLoader: Unable to load native-hadoop > > library for your platform... using builtin-java classes where applicable > > 16/04/12 18:20:29 WARN Utils: Your hostname, T resolves to a loopback > > address: 127.0.1.1; using 192.168.1.119 instead (on interface wlp3s0) > > 16/04/12 18:20:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > > another address > > 16/04/12 18:20:29 INFO SecurityManager: Changing view acls to: jeff > > 16/04/12 18:20:29 INFO SecurityManager: Changing modify acls to: jeff > > 16/04/12 18:20:29 INFO SecurityManager: SecurityManager: authentication > > disabled; ui acls disabled; users with view permissions: Set(jeff); > > users with modify permissions: Set(jeff) > > 16/04/12 18:20:31 INFO Slf4jLogger: Slf4jLogger started > > 16/04/12 18:20:31 INFO Remoting: Starting remoting > > 16/04/12 18:20:32 INFO Remoting: Remoting started; listening on > > addresses :[akka.tcp://[email protected]:40821] > > 16/04/12 18:20:32 INFO Utils: Successfully started service 'sparkDriver' > > on port 40821. > > 16/04/12 18:20:32 INFO SparkEnv: Registering MapOutputTracker > > 16/04/12 18:20:32 INFO SparkEnv: Registering BlockManagerMaster > > 16/04/12 18:20:32 INFO DiskBlockManager: Created local directory at > > /tmp/blockmgr-12aaa425-2b1b-4182-865d-5d231ee10cda > > 16/04/12 18:20:33 INFO MemoryStore: MemoryStore started with capacity > > 441.7 MB > > 16/04/12 18:20:33 INFO HttpFileServer: HTTP File server directory is > > > /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b/httpd-bf2fd43c-b3c9-4840-bc72-e2f16965df9a > > 16/04/12 18:20:33 INFO HttpServer: Starting HTTP Server > > 16/04/12 18:20:33 INFO Utils: Successfully started service 'HTTP file > > server' on port 46840. > > 16/04/12 18:20:33 INFO SparkEnv: Registering OutputCommitCoordinator > > 16/04/12 18:20:33 INFO Utils: Successfully started service 'SparkUI' on > > port 4040. > > 16/04/12 18:20:33 INFO SparkUI: Started SparkUI at > http://192.168.1.119:4040 > > 16/04/12 18:20:34 WARN MetricsSystem: Using default name DAGScheduler > > for source because spark.app.id is not set. > > 16/04/12 18:20:34 INFO Executor: Starting executor ID driver on host > > localhost > > 16/04/12 18:20:34 INFO Utils: Successfully started service > > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43549. > > 16/04/12 18:20:34 INFO NettyBlockTransferService: Server created on 43549 > > 16/04/12 18:20:34 INFO BlockManagerMaster: Trying to register > BlockManager > > 16/04/12 18:20:34 INFO BlockManagerMasterEndpoint: Registering block > > manager localhost:43549 with 441.7 MB RAM, BlockManagerId(driver, > > localhost, 43549) > > 16/04/12 18:20:34 INFO BlockManagerMaster: Registered BlockManager > > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Entering > > directly-translatable composite transform: 'ReadLines' > > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Skipping > > 'ReadLines/Read'; already in composite transform. > > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Post-visiting > > directly-translatable composite transform: 'ReadLines' > > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Evaluating > > ReadLines [TextIO.Read] > > 16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(110248) called with > > curMem=0, maxMem=463176990 > > 16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0 stored as values > > in memory (estimated size 107.7 KB, free 441.6 MB) > > 16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(10056) called with > > curMem=110248, maxMem=463176990 > > 16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0_piece0 stored as > > bytes in memory (estimated size 9.8 KB, free 441.6 MB) > > 16/04/12 18:20:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in > > memory on localhost:43549 (size: 9.8 KB, free: 441.7 MB) > > 16/04/12 18:20:35 INFO SparkContext: Created broadcast 0 from textFile > > at TransformTranslator.java:471 > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating > > ParDo(ExtractWords) > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating Init > > [AnonymousParDo] > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Entering > > directly-translatable composite transform: > > 'WordCount.CountWords/Count.PerElement/Count.PerKey' > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping > > > 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows/ParDo(ReifyTimestampAndWindows)'; > > already in composite transform. > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping > > > 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly'; > > already in composite transform. > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping > > > 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.SortValuesByTimestamp/AnonymousParDo'; > > already in composite transform. > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping > > > 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow/ParDo(GroupAlsoByWindowsViaOutputBuffer)'; > > already in composite transform. > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping > > > 'WordCount.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo'; > > already in composite transform. > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Post-visiting > > directly-translatable composite transform: > > 'WordCount.CountWords/Count.PerElement/Count.PerKey' > > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating > > Count.PerKey [Combine.PerKey] > > 16/04/12 18:20:36 INFO FileInputFormat: Total input paths to process : 1 > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating Map > > [AnonymousParDo] > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Entering > > directly-translatable composite transform: 'WriteCounts' > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/Create.Values'; already in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/Initialize'; already in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/View.AsSingleton/View.CreatePCollectionView'; already > > in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/WriteBundles'; already in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/Window.Into()'; already in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/View.AsIterable/View.CreatePCollectionView'; already > > in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping > > 'WriteCounts/Write/Finalize'; already in composite transform. > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Post-visiting > > directly-translatable composite transform: 'WriteCounts' > > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating > > WriteCounts [TextIO.Write] > > 16/04/12 18:20:36 INFO deprecation: mapred.output.dir is deprecated. > > Instead, use mapreduce.output.fileoutputformat.outputdir > > 16/04/12 18:20:36 INFO SparkContext: Starting job: > > saveAsNewAPIHadoopFile at TransformTranslator.java:660 > > 16/04/12 18:20:36 INFO DAGScheduler: Registering RDD 6 (mapToPair at > > TransformTranslator.java:304) > > 16/04/12 18:20:36 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile > > at TransformTranslator.java:660) with 1 output partitions > > 16/04/12 18:20:36 INFO DAGScheduler: Final stage: ResultStage > > 1(saveAsNewAPIHadoopFile at TransformTranslator.java:660) > > 16/04/12 18:20:36 INFO DAGScheduler: Parents of final stage: > > List(ShuffleMapStage 0) > > 16/04/12 18:20:36 INFO DAGScheduler: Missing parents: > > List(ShuffleMapStage 0) > > 16/04/12 18:20:36 INFO DAGScheduler: Submitting ShuffleMapStage 0 > > (MapPartitionsRDD[6] at mapToPair at TransformTranslator.java:304), > > which has no missing parents > > 16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(10368) called with > > curMem=120304, maxMem=463176990 > > 16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1 stored as values > > in memory (estimated size 10.1 KB, free 441.6 MB) > > 16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(5001) called with > > curMem=130672, maxMem=463176990 > > 16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1_piece0 stored as > > bytes in memory (estimated size 4.9 KB, free 441.6 MB) > > 16/04/12 18:20:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in > > memory on localhost:43549 (size: 4.9 KB, free: 441.7 MB) > > 16/04/12 18:20:36 INFO SparkContext: Created broadcast 1 from broadcast > > at DAGScheduler.scala:861 > > 16/04/12 18:20:36 INFO DAGScheduler: Submitting 1 missing tasks from > > ShuffleMapStage 0 (MapPartitionsRDD[6] at mapToPair at > > TransformTranslator.java:304) > > 16/04/12 18:20:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 > tasks > > 16/04/12 18:20:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 > > (TID 0, localhost, PROCESS_LOCAL, 2142 bytes) > > 16/04/12 18:20:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > > 16/04/12 18:20:36 INFO HadoopRDD: Input split: > > file:/home/jeff/test/beam/input/kinglear.txt:0+185965 > > 16/04/12 18:20:36 INFO deprecation: mapred.tip.id is deprecated. > > Instead, use mapreduce.task.id > > 16/04/12 18:20:36 INFO deprecation: mapred.task.id is deprecated. > > Instead, use mapreduce.task.attempt.id > > 16/04/12 18:20:36 INFO deprecation: mapred.task.is.map is deprecated. > > Instead, use mapreduce.task.ismap > > 16/04/12 18:20:36 INFO deprecation: mapred.task.partition is deprecated. > > Instead, use mapreduce.task.partition > > 16/04/12 18:20:36 INFO deprecation: mapred.job.id is deprecated. > > Instead, use mapreduce.job.id > > 16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). > > 4316 bytes result sent to driver > > 16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 > > (TID 0) in 477 ms on localhost (1/1) > > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose > > tasks have all completed, from pool > > 16/04/12 18:20:37 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at > > TransformTranslator.java:304) finished in 0.511 s > > 16/04/12 18:20:37 INFO DAGScheduler: looking for newly runnable stages > > 16/04/12 18:20:37 INFO DAGScheduler: running: Set() > > 16/04/12 18:20:37 INFO DAGScheduler: waiting: Set(ResultStage 1) > > 16/04/12 18:20:37 INFO DAGScheduler: failed: Set() > > 16/04/12 18:20:37 INFO DAGScheduler: Missing parents for ResultStage 1: > > List() > > 16/04/12 18:20:37 INFO DAGScheduler: Submitting ResultStage 1 > > (MapPartitionsRDD[14] at mapToPair at TransformTranslator.java:486), > > which is now runnable > > 16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(54976) called with > > curMem=135673, maxMem=463176990 > > 16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2 stored as values > > in memory (estimated size 53.7 KB, free 441.5 MB) > > 16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(19334) called with > > curMem=190649, maxMem=463176990 > > 16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2_piece0 stored as > > bytes in memory (estimated size 18.9 KB, free 441.5 MB) > > 16/04/12 18:20:37 INFO BlockManagerInfo: Added broadcast_2_piece0 in > > memory on localhost:43549 (size: 18.9 KB, free: 441.7 MB) > > 16/04/12 18:20:37 INFO SparkContext: Created broadcast 2 from broadcast > > at DAGScheduler.scala:861 > > 16/04/12 18:20:37 INFO DAGScheduler: Submitting 1 missing tasks from > > ResultStage 1 (MapPartitionsRDD[14] at mapToPair at > > TransformTranslator.java:486) > > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 > tasks > > 16/04/12 18:20:37 INFO TaskSetManager: Starting task 0.0 in stage 1.0 > > (TID 1, localhost, PROCESS_LOCAL, 1901 bytes) > > 16/04/12 18:20:37 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) > > 16/04/12 18:20:37 INFO deprecation: mapreduce.outputformat.class is > > deprecated. Instead, use mapreduce.job.outputformat.class > > 16/04/12 18:20:37 INFO deprecation: mapred.output.key.class is > > deprecated. Instead, use mapreduce.job.output.key.class > > 16/04/12 18:20:37 INFO deprecation: mapred.output.value.class is > > deprecated. Instead, use mapreduce.job.output.value.class > > 16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty > > blocks out of 1 blocks > > 16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Started 0 remote > > fetches in 6 ms > > 16/04/12 18:20:37 INFO FileOutputCommitter: Saved output of task > > 'attempt_201604121820_0014_r_000000_0' to > > file:/home/jeff/test/beam/_temporary/0/task_201604121820_0014_r_000000 > > 16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). > > 1326 bytes result sent to driver > > 16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 1.0 > > (TID 1) in 126 ms on localhost (1/1) > > 16/04/12 18:20:37 INFO DAGScheduler: ResultStage 1 > > (saveAsNewAPIHadoopFile at TransformTranslator.java:660) finished in > 0.127 s > > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose > > tasks have all completed, from pool > > 16/04/12 18:20:37 INFO DAGScheduler: Job 0 finished: > > saveAsNewAPIHadoopFile at TransformTranslator.java:660, took 0.881797 s > > 16/04/12 18:20:37 INFO SparkPipelineRunner: Pipeline execution complete. > > 16/04/12 18:20:37 INFO SparkContext: Invoking stop() from shutdown hook > > 16/04/12 18:20:37 INFO SparkUI: Stopped Spark web UI at > > http://192.168.1.119:4040 > > 16/04/12 18:20:37 INFO DAGScheduler: Stopping DAGScheduler > > 16/04/12 18:20:37 INFO MapOutputTrackerMasterEndpoint: > > MapOutputTrackerMasterEndpoint stopped! > > 16/04/12 18:20:37 INFO MemoryStore: MemoryStore cleared > > 16/04/12 18:20:37 INFO BlockManager: BlockManager stopped > > 16/04/12 18:20:37 INFO BlockManagerMaster: BlockManagerMaster stopped > > 16/04/12 18:20:37 INFO SparkContext: Successfully stopped SparkContext > > 16/04/12 18:20:37 INFO > > OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: > > OutputCommitCoordinator stopped! > > 16/04/12 18:20:37 INFO ShutdownHookManager: Shutdown hook called > > 16/04/12 18:20:37 INFO ShutdownHookManager: Deleting directory > > /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b > > [INFO] > > ------------------------------------------------------------------------ > > [INFO] BUILD SUCCESS > > [INFO] > > ------------------------------------------------------------------------ > > [INFO] Total time: 14.206 s > > [INFO] Finished at: 2016-04-12T18:20:37+08:00 > > [INFO] Final Memory: 23M/167M > > [INFO] > > ------------------------------------------------------------------------ > > > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
