Hi Richard, I wonder if you're being hit by https://issues.apache.org/jira/browse/BEAM-1309 -- namely, that the entire /tmp directory might be being traversed.
As a sanity check, can you try moving your test file into a more specific folder, like /tmp/beam/test_input/input.xml If this resolves your issue, it's a good argument for prioritizing fixing that issue ;) Dan On Fri, Apr 7, 2017 at 5:37 AM, Richard Hanson <[email protected]> wrote: > > On 06 April 2017 at 19:53 Dan Halperin <[email protected]> wrote: > > Hi Richard, > > Can you share a little more info about your environment? Here's a > smattering of questions for which answers may be useful. > > * What runner are you using? > > I don't specify any runner so I believe it should be use direct runner. > > * What version of the SDK? > > Apache Beam version is 0.6.0 (beam-sdks-java-core > and beam-runners-direct-java) > > * Does this reproduce in the DirectRunner? > > This problem I believe happens while running DirectRunner. > > * Can you share a full reproduction? (e.g., in a github gist)? > > JDK: 1.8.0_121 > > Scala: 2.12.1 > > sbt: 0.13.13 > > > Below is the sample xml file > > <?xml version="1.0" encoding="UTF-8" standalone="yes"?> > <customers> > <customer id="100"> > <age>33</age> > <name>John Smith</name> > </customer> > </customers> > > > The sample record object. > > @XmlRootElement > class Customer { > > private var name: String = "" > > private var age: Int = 0 > > private var id: Int = -1 > > def getName():String = name > > @XmlElement > def setName(name: String) = this.name = name > > def getAge(): Int = age > > @XmlElement > def setAge(age: Int) = this.age = age > > def getId(): Int = id > > @XmlAttribute > def setId(id: Int) = this.id = id > > } > > Pipeline procedure of the code. > > > val options = PipelineOptionsFactory.create > val p = Pipeline.create(options) > > val source = XmlSource.from[Customer]( > new File("customers.xml").toPath.toString > ).withRootElement("customers").withRecordElement("customer"). > withRecordClass(classOf[Customer]) > > val sink = XmlSink.write().toFilenamePrefix("xmlout"). > ofRecordClass(classOf[Customer]). > withRootElement("customers") > > p.apply(Read.from(source)).apply(Write.to(sink)) > > p.run.waitUntilFinish > > > * What is happening on the machine(s) executing the job? Is there high > CPU? Is the disk active? Etc. > > There is a high cpu usage which keeps at 99.x% when Java process is > executing (when checking with top command). > > 7624 user 20 0 2837.6m 582.5m 23.5m S 99.3 11.4 2:42.11 java > > Monitoring with iotop shows disk io are (mostly) often performed by system > processes e.g. kworker. Only seeing once or twice Java process (the only > user process that runs on the machine) is doing disk io. > > Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s > Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s > > > TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND > 7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2] > > > Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s > Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s > TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND > > 7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test > > 7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test > > > Thanks, > Dan > > On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson <[email protected]> > wrote: > > I am testing apache beam to read/ write xml files. But I encounter a > problem that even the code is just to read a single xml file and write it > out without doing any transformation, the process seems to hang > indefinitely. The output looks like below: > > [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO org.apache.beam.sdk.io > .FileBasedSource - Matched 1 files for pattern /tmp/input/my.xml > [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write > operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c > > > The code basically do the following: > > val options = PipelineOptionsFactory.create > val p = Pipeline.create(options) > > > val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").toPa > th.toString).withRootElement("RootE").withRecordElement(" > Record").withRecordClass(classOf[Record]) > > p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFil > enamePrefix("xml").ofRecordClass(classOf[Record]). > withRootElement("RootE"))) > > p.run.waitUntilFinish > > > What part may be missing in my program? > > Thanks > > >
