I'm trying to parse OpenPGP key server dumps. Throughput is not too bad so far, but I'd like to speed things up by introducing parallelism.
The dumps are split into several files (each about 40 MB large, with a few thousand keys). The toy version I have so far does not need any coordination between the processing of individual files This works quite well when running the sequential version in parallel, using GNU parallel. With GOMAXPROCS=1, serial execution time is reduced from 141 seconds to 31 seconds. This is roughly what I would expect from six cores with hyperthreading and use of GNU parallel (which has some overhead of its own in this scenario). However, the version with built-in parallelism runs in 55 seconds, so only half as fast as the GNU parallel approach. I would have expect it to fare better compared to that. The parallel version is my second one which has decent performance. I also tried a variant which has a per-thread buffer which is occasionally written to standard output, synchronized by a sync.Mutex. (It would likely have benefited from a sync.Mutex.TryLock() function, delaying the buffer flush if there was contention.) This was still significantly slower than the external parallelization, but perhaps a little faster than parallel version attached below. I think both parallel approaches produce more garbage than the sequential version, but likely not as much to explain the speed difference compared to the sequential version with external parallelization. “perf top” suggests that a lot of time is spent in GC-related Go functions (which is expected, considering what the program does). But vmstat shows a large number of context switches, which I find surprising. It is much higher than the number of context switching during a GNU parallel run. Most tests were run with the current master branch. Is there anything else I could try to make the internally parallized version as fast the externally parallized one? package main import ( "bytes" "bufio" "flag" "fmt" "io" openpgp "golang.org/x/crypto/openpgp/packet" "os" "reflect" "runtime" "sync" ) var verbose bool var outputLock sync.Mutex // Print the string to standard output, with optional synchronization. type Output interface { Print(format string, args ...interface{}) } func packetError(path string, packet openpgp.OpaquePacket, err error) { outputLock.Lock() defer outputLock.Unlock() } func printPacketType(packet openpgp.Packet) { outputLock.Lock() defer outputLock.Unlock() } // A printer for OpenPGP user IDs which writes multiple key IDs en // bloc, to avoid lock contention. type uidPrinter struct { buffer bytes.Buffer count int } // Print one user ID. The user ID might not be printed until Flush() // is called. func (p *uidPrinter) Print(uid *openpgp.UserId) { fmt.Fprintf(&p.buffer, "uid: %#v\n", uid.Id) p.count++ // Prevent the buffer from becoming too large. if p.count > 1000 { p.Flush() } } // Print all the staged key IDs. func (p *uidPrinter) Flush() { outputLock.Lock() defer outputLock.Unlock() os.Stdout.Write(p.buffer.Bytes()) p.buffer.Truncate(0) p.count = 0 } func printError(format string, args ...interface{}) { outputLock.Lock() defer outputLock.Unlock() fmt.Fprintf(os.Stderr, format, args...) } func readFile(path string, output Output) error { file, err := os.Open(path) if err != nil { return err } defer file.Close() buf := bufio.NewReader(file) packets := openpgp.NewOpaqueReader(buf) for { op, err := packets.Next() if err != nil { if (err == io.EOF) { break } return err } p, err := op.Parse() if err != nil { continue } if verbose { output.Print("%s\n", reflect.TypeOf(p).String()) } if uid, ok := p.(*openpgp.UserId); ok { output.Print("uid: %#v\n", uid.Id) } } return nil } // Buffered output to standard output, without synchronization. type sequentialOutput struct { buffer *bufio.Writer } func newSequentialOutput() *sequentialOutput { return &sequentialOutput{bufio.NewWriter(os.Stdout)} } func (p *sequentialOutput) Print(format string, args ...interface{}) { fmt.Fprintf(p.buffer, format, args...) } func (p *sequentialOutput) Stop() { p.buffer.Flush() } func processSequential(files []string) { output := newSequentialOutput() for _, path := range files { err := readFile(path, output) if err != nil { printError("%s: error: %s\n", path, err.Error()) } } output.Stop() } // Return a channel a closed which contains the given path strings. func channelOfPaths(paths []string) chan string { ch := make(chan string, len(paths)) for _, path := range paths { ch <- path } close(ch) return ch } // Spawned as a goroutine to process files from a channel. func processFromChannel(filesChannel chan string, ackChannel chan struct{}, output Output) { for path := range filesChannel { err := readFile(path, output) if err != nil { printError("%s: error: %s\n", path, err.Error()) } ackChannel <- struct{}{} } } // Parallel output to standard output, with synchronization to prevent // interleaving. type parallelOutput struct { dataChannel chan string // Strings to print, "" means termiantion. ackChannel chan struct{} // Signals completed termination request. } func newParallelOutput() *parallelOutput { p := ¶llelOutput{ dataChannel: make(chan string, 5000), ackChannel: make(chan struct{})} go parallelOutputGoroutine(p) return p } func parallelOutputGoroutine(p *parallelOutput) { var buf = bufio.NewWriter(os.Stdout) for s := range p.dataChannel { if s == "" { break } buf.WriteString(s) } buf.Flush() // Tell Stop() that processing is complete. close(p.ackChannel) } func (p *parallelOutput) Print(format string, args ...interface{}) { s := fmt.Sprintf(format, args...) if s != "" { p.dataChannel <- s } } func (p *parallelOutput) Stop() { // Request termination. p.dataChannel <- "" // Wait for completion of all pending requests. _, _ = <- p.ackChannel } // Number of parallel threads, controlled by the -threads flag. var threads int // Process the files in parallel. func processParallel(files []string) { if len(files) == 0 { return } filesChannel := channelOfPaths(files) // This channel is used to detect termination. ackChannel := make(chan struct{}) fmt.Fprintf(os.Stderr, "info: threads: %d\n", threads) output := newParallelOutput() for i := 0; i < threads; i++ { go processFromChannel(filesChannel, ackChannel, output) } // Wait for termination of the conversion. acks := 0 for _ = range ackChannel { acks++ if acks == len(files) { break } } // Wait until all pending entries have been printed. output.Stop() } func main() { var sequential bool flag.BoolVar(&verbose, "verbose", false, "more verbose output") flag.BoolVar(&sequential, "sequential", false, "disable concurrency") flag.IntVar(&threads, "threads", runtime.GOMAXPROCS(0), "number of processing threads") flag.Parse() files := flag.Args() if (sequential) { processSequential(files) } else { processParallel(files) } } -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.