> On 06 April 2017 at 19:53 Dan Halperin <[email protected]> wrote:
> 
>     Hi Richard,
> 
>     Can you share a little more info about your environment? Here's a 
> smattering of questions for which answers may be useful.
> 
>     * What runner are you using? 
> 

I don't specify any runner so I believe it should be use direct runner.

>     * What version of the SDK?
> 

Apache Beam version is 0.6.0 (beam-sdks-java-core and beam-runners-direct-java)

>     * Does this reproduce in the DirectRunner?
> 

This problem I believe happens while running DirectRunner. 

>     * Can you share a full reproduction? (e.g., in a github gist)?
> 

JDK: 1.8.0_121

Scala: 2.12.1

sbt: 0.13.13


Below is the sample xml file

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<customers>
<customer id="100">
<age>33</age>
<name>John Smith</name>
</customer>
</customers>


The sample record object.

@XmlRootElement
class Customer {

private var name: String = ""

private var age: Int = 0

private var id: Int = -1

def getName():String = name

@XmlElement
def setName(name: String) = this.name = name

def getAge(): Int = age

@XmlElement
def setAge(age: Int) = this.age = age

def getId(): Int = id

@XmlAttribute
def setId(id: Int) = this.id = id

}

Pipeline procedure of the code. 


val options = PipelineOptionsFactory.create
val p = Pipeline.create(options)

val source = XmlSource.from[Customer](
new File("customers.xml").toPath.toString
).withRootElement("customers").withRecordElement("customer").
withRecordClass(classOf[Customer])

val sink = XmlSink.write().toFilenamePrefix("xmlout").
ofRecordClass(classOf[Customer]).
withRootElement("customers")

p.apply(Read.from(source)).apply(Write.to(sink))

p.run.waitUntilFinish


>     * What is happening on the machine(s) executing the job? Is there high 
> CPU? Is the disk active? Etc.
> 

There is a high cpu usage which keeps at 99.x% when Java process is executing 
(when checking with top command). 

7624 user      20   0 2837.6m 582.5m  23.5m S 99.3 11.4   2:42.11 java

Monitoring with iotop shows disk io are (mostly) often performed by system 
processes e.g. kworker. Only seeing once or twice Java process (the only user 
process that runs on the machine) is doing disk io. 

Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s


TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]


Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND

7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test

7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test


>     Thanks,
>     Dan
> 
>     On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson <[email protected] 
> mailto:[email protected] > wrote:
> 
>         > > 
> >         I am testing apache beam to read/ write xml files. But I encounter 
> > a problem that even the code is just to read a single xml file and write it 
> > out without doing any transformation, the process seems to hang 
> > indefinitely. The output looks like below:
> > 
> >         [pool-2-thread-5-ScalaTest-running-XmlSpec] 
> > INFOhttp://org.apache.beam.sdk.io .FileBasedSource - Matched 1 files for 
> > pattern /tmp/input/my.xml
> >         [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing 
> > write operationhttp://org.apache.beam.sdk.io 
> > .XmlSink$XmlWriteOperation@1c72df2c
> > 
> > 
> >         The code basically do the following:
> > 
> >         val options = PipelineOptionsFactory.create
> >         val p = Pipeline.create(options)
> > 
> > 
> >         val xml = XmlSource.from[Record](new 
> > File("/tmp/input/my.xml").toPath.toString).withRootElement("RootE").withRecordElement("Record").withRecordClass(classOf[Record])
> > 
> >         
> > p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFilenamePrefix("xml").ofRecordClass(classOf[Record]).withRootElement("RootE")))
> > 
> >         p.run.waitUntilFinish
> > 
> > 
> >         What part may be missing in my program?
> > 
> >         Thanks
> > 
> >     > 
> 

Reply via email to