Hi Richard,

I wonder if you're being hit by
https://issues.apache.org/jira/browse/BEAM-1309 -- namely, that the entire
/tmp directory might be being traversed.

As a sanity check, can you try moving your test file into a more specific
folder, like /tmp/beam/test_input/input.xml

If this resolves your issue, it's a good argument for prioritizing fixing
that issue ;)

Dan

On Fri, Apr 7, 2017 at 5:37 AM, Richard Hanson <[email protected]> wrote:

>
> On 06 April 2017 at 19:53 Dan Halperin <[email protected]> wrote:
>
> Hi Richard,
>
> Can you share a little more info about your environment? Here's a
> smattering of questions for which answers may be useful.
>
> * What runner are you using?
>
> I don't specify any runner so I believe it should be use direct runner.
>
> * What version of the SDK?
>
> Apache Beam version is 0.6.0 (beam-sdks-java-core
> and beam-runners-direct-java)
>
> * Does this reproduce in the DirectRunner?
>
> This problem I believe happens while running DirectRunner.
>
> * Can you share a full reproduction? (e.g., in a github gist)?
>
> JDK: 1.8.0_121
>
> Scala: 2.12.1
>
> sbt: 0.13.13
>
>
> Below is the sample xml file
>
> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> <customers>
> <customer id="100">
> <age>33</age>
> <name>John Smith</name>
> </customer>
> </customers>
>
>
> The sample record object.
>
> @XmlRootElement
> class Customer {
>
> private var name: String = ""
>
> private var age: Int = 0
>
> private var id: Int = -1
>
> def getName():String = name
>
> @XmlElement
> def setName(name: String) = this.name = name
>
> def getAge(): Int = age
>
> @XmlElement
> def setAge(age: Int) = this.age = age
>
> def getId(): Int = id
>
> @XmlAttribute
> def setId(id: Int) = this.id = id
>
> }
>
> Pipeline procedure of the code.
>
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
> val source = XmlSource.from[Customer](
> new File("customers.xml").toPath.toString
> ).withRootElement("customers").withRecordElement("customer").
> withRecordClass(classOf[Customer])
>
> val sink = XmlSink.write().toFilenamePrefix("xmlout").
> ofRecordClass(classOf[Customer]).
> withRootElement("customers")
>
> p.apply(Read.from(source)).apply(Write.to(sink))
>
> p.run.waitUntilFinish
>
>
> * What is happening on the machine(s) executing the job? Is there high
> CPU? Is the disk active? Etc.
>
> There is a high cpu usage which keeps at 99.x% when Java process is
> executing (when checking with top command).
>
> 7624 user      20   0 2837.6m 582.5m  23.5m S 99.3 11.4   2:42.11 java
>
> Monitoring with iotop shows disk io are (mostly) often performed by system
> processes e.g. kworker. Only seeing once or twice Java process (the only
> user process that runs on the machine) is doing disk io.
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
>
>
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
> 7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]
>
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
>
> 7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
> 7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
>
> Thanks,
> Dan
>
> On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson <[email protected]>
> wrote:
>
> I am testing apache beam to read/ write xml files. But I encounter a
> problem that even the code is just to read a single xml file and write it
> out without doing any transformation, the process seems to hang
> indefinitely. The output looks like below:
>
> [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO org.apache.beam.sdk.io
> .FileBasedSource - Matched 1 files for pattern /tmp/input/my.xml
> [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write
> operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c
>
>
> The code basically do the following:
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
>
> val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").toPa
> th.toString).withRootElement("RootE").withRecordElement("
> Record").withRecordClass(classOf[Record])
>
> p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFil
> enamePrefix("xml").ofRecordClass(classOf[Record]).
> withRootElement("RootE")))
>
> p.run.waitUntilFinish
>
>
> What part may be missing in my program?
>
> Thanks
>
>
>

Reply via email to