I'm trying to parse OpenPGP key server dumps.  Throughput is not too
bad so far, but I'd like to speed things up by introducing
parallelism.

The dumps are split into several files (each about 40 MB large, with a
few thousand keys).  The toy version I have so far does not need any
coordination between the processing of individual files

This works quite well when running the sequential version in parallel,
using GNU parallel.  With GOMAXPROCS=1, serial execution time is
reduced from 141 seconds to 31 seconds.  This is roughly what I would
expect from six cores with hyperthreading and use of GNU parallel
(which has some overhead of its own in this scenario).

However, the version with built-in parallelism runs in 55 seconds, so
only half as fast as the GNU parallel approach.  I would have expect
it to fare better compared to that.  The parallel version is my second
one which has decent performance.  I also tried a variant which has a
per-thread buffer which is occasionally written to standard output,
synchronized by a sync.Mutex.  (It would likely have benefited from a
sync.Mutex.TryLock() function, delaying the buffer flush if there was
contention.)  This was still significantly slower than the external
parallelization, but perhaps a little faster than parallel version
attached below.  I think both parallel approaches produce more garbage
than the sequential version, but likely not as much to explain the
speed difference compared to the sequential version with external
parallelization.

“perf top” suggests that a lot of time is spent in GC-related Go
functions (which is expected, considering what the program does).  But
vmstat shows a large number of context switches, which I find
surprising.  It is much higher than the number of context switching
during a GNU parallel run.

Most tests were run with the current master branch.

Is there anything else I could try to make the internally parallized
version as fast the externally parallized one?

package main

import (
        "bytes"
        "bufio"
        "flag"
        "fmt"
        "io"
        openpgp "golang.org/x/crypto/openpgp/packet"
        "os"
        "reflect"
        "runtime"
        "sync"
)

var verbose bool

var outputLock sync.Mutex

// Print the string to standard output, with optional synchronization.
type Output interface {
        Print(format string, args ...interface{})
}

func packetError(path string, packet openpgp.OpaquePacket, err error) {
        outputLock.Lock()
        defer outputLock.Unlock()
}

func printPacketType(packet openpgp.Packet) {
        outputLock.Lock()
        defer outputLock.Unlock()
}

// A printer for OpenPGP user IDs which writes multiple key IDs en
// bloc, to avoid lock contention.
type uidPrinter struct {
        buffer bytes.Buffer
        count int
}

// Print one user ID.  The user ID might not be printed until Flush()
// is called.
func (p *uidPrinter) Print(uid *openpgp.UserId) {
        fmt.Fprintf(&p.buffer, "uid: %#v\n", uid.Id)
        p.count++
        // Prevent the buffer from becoming too large.
        if p.count > 1000 {
                p.Flush()
        }
}

// Print all the staged key IDs.
func (p *uidPrinter) Flush() {
        outputLock.Lock()
        defer outputLock.Unlock()
        os.Stdout.Write(p.buffer.Bytes())
        p.buffer.Truncate(0)
        p.count = 0
}

func printError(format string, args ...interface{}) {
        outputLock.Lock()
        defer outputLock.Unlock()
        fmt.Fprintf(os.Stderr, format, args...)
}

func readFile(path string, output Output) error {
        file, err := os.Open(path)
        if err != nil {
                return err
        }
        defer file.Close()
        buf := bufio.NewReader(file)
        packets := openpgp.NewOpaqueReader(buf)
        for {
                op, err := packets.Next()
                if err != nil {
                        if (err == io.EOF) {
                                break
                        }
                        return err
                }
                p, err := op.Parse()
                if err != nil {
                        continue
                }
                if verbose {
                        output.Print("%s\n", reflect.TypeOf(p).String())
                }
                if uid, ok := p.(*openpgp.UserId); ok {
                        output.Print("uid: %#v\n", uid.Id)
                }
        }
        return nil
}

// Buffered output to standard output, without synchronization.
type sequentialOutput struct {
        buffer *bufio.Writer
}

func newSequentialOutput() *sequentialOutput {
        return &sequentialOutput{bufio.NewWriter(os.Stdout)}
}

func (p *sequentialOutput) Print(format string, args ...interface{}) {
        fmt.Fprintf(p.buffer, format, args...)
}

func (p *sequentialOutput) Stop() {
        p.buffer.Flush()
}

func processSequential(files []string) {
        output := newSequentialOutput()
        for _, path := range files {
                err := readFile(path, output)
                if err != nil {
                        printError("%s: error: %s\n", path, err.Error())
                }
        }
        output.Stop()
}

// Return a channel a closed which contains the given path strings.
func channelOfPaths(paths []string) chan string {
        ch := make(chan string, len(paths))
        for _, path := range paths {
                ch <- path
        }
        close(ch)
        return ch
}

// Spawned as a goroutine to process files from a channel.
func processFromChannel(filesChannel chan string, ackChannel chan struct{},
        output Output) {
        for path := range filesChannel {
                err := readFile(path, output)
                if err != nil {
                        printError("%s: error: %s\n", path, err.Error())
                }
                ackChannel <- struct{}{}
        }
}

// Parallel output to standard output, with synchronization to prevent
// interleaving.
type parallelOutput struct {
        dataChannel chan string // Strings to print, "" means termiantion.
        ackChannel chan struct{} // Signals completed termination request.
}

func newParallelOutput() *parallelOutput {
        p := &parallelOutput{
                dataChannel: make(chan string, 5000),
                ackChannel: make(chan struct{})}
        go parallelOutputGoroutine(p)
        return p
}


func parallelOutputGoroutine(p *parallelOutput) {
        var buf = bufio.NewWriter(os.Stdout)
        for s := range p.dataChannel {
                if s == "" {
                        break
                }
                buf.WriteString(s)
        }
        buf.Flush()
        // Tell Stop() that processing is complete.
        close(p.ackChannel)
}

func (p *parallelOutput) Print(format string, args ...interface{}) {
        s := fmt.Sprintf(format, args...)
        if s != "" {
                p.dataChannel <- s
        }
}

func (p *parallelOutput) Stop() {
        // Request termination.
        p.dataChannel <- ""
        // Wait for completion of all pending requests.
        _, _ = <- p.ackChannel
}

// Number of parallel threads, controlled by the -threads flag.
var threads int

// Process the files in parallel.
func processParallel(files []string) {
        if len(files) == 0 {
                return
        }

        filesChannel := channelOfPaths(files)

        // This channel is used to detect termination.
        ackChannel := make(chan struct{})

        fmt.Fprintf(os.Stderr, "info: threads: %d\n", threads)

        output := newParallelOutput()
        for i := 0; i < threads; i++  {
                go processFromChannel(filesChannel, ackChannel, output)
        }

        // Wait for termination of the conversion.
        acks := 0
        for _ = range ackChannel {
                acks++
                if acks == len(files) {
                        break
                }
        }

        // Wait until all pending entries have been printed.
        output.Stop()
}

func main() {
        var sequential bool
        flag.BoolVar(&verbose, "verbose", false, "more verbose output")
        flag.BoolVar(&sequential, "sequential", false, "disable concurrency")
        flag.IntVar(&threads, "threads", runtime.GOMAXPROCS(0),
                "number of processing threads")
        flag.Parse()
        
        files := flag.Args()
        if (sequential) {
                processSequential(files)
        } else {
                processParallel(files)
        }
}

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to