PROTON-1306: Go container improvements and client-server example. Added convenience ops to Container: Dial and Accept. Added client-server documentation example. Added Container.String(), improve other proton String() funcs.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6134e216 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6134e216 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6134e216 Branch: refs/heads/master Commit: 6134e216caf952aa031a49ab09d3e59ba71b1965 Parents: 77b907b Author: Alan Conway <acon...@redhat.com> Authored: Fri Sep 9 12:37:01 2016 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Tue Sep 20 17:43:35 2016 -0400 ---------------------------------------------------------------------- examples/go/README.md | 104 +++++++++++-------- examples/go/electron/broker.go | 9 +- examples/go/electron/receive.go | 8 +- examples/go/electron/send.go | 9 +- .../bindings/go/src/qpid.apache.org/README.md | 5 +- .../go/src/qpid.apache.org/amqp/message.go | 2 + .../src/qpid.apache.org/electron/connection.go | 35 ++++++- .../src/qpid.apache.org/electron/container.go | 52 +++++++--- .../go/src/qpid.apache.org/electron/doc.go | 10 ++ .../electron/ex_client_server_test.go | 81 +++++++++++++++ .../go/src/qpid.apache.org/electron/sender.go | 10 +- .../go/src/qpid.apache.org/proton/wrappers.go | 4 +- 12 files changed, 237 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/README.md ---------------------------------------------------------------------- diff --git a/examples/go/README.md b/examples/go/README.md index 9ba497b..24f4d2a 100644 --- a/examples/go/README.md +++ b/examples/go/README.md @@ -1,36 +1,34 @@ -# Go examples for proton +# Go examples -There are 3 Go packages for proton: +## Electron examples -- qpid.apache.org/electron: Concurrent, procedural API for messaging clients and servers. -- qpid.apache.org/proton: Direct access to the event-driven, concurrent-unsafe proton library. -- qpid.apache.org/amqp: Convert AMQP messages and data to and from Go data types. +[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a +simple API for writing concurrent AMQP clients and servers. -`proton` and `electron` are alternative APIs for sending messages. `proton` is a -direct wrapping of the concurrent-unsafe, event-driven C proton API. `electron` -is a procedural, concurrent-safe interface that may be more convenient and -familiar for Go programmers. The examples `proton/broker.go` and -`electron/broker.go` give an illustration of how the APIs differ. - -## Example programs - -electron - [receive.go](electron/receive.go) receive from many connections concurrently. - [send.go](electron/send.go) send to many connections concurrently. - [broker.go](electron/broker.go) a simple broker using the electron API +n +## Proton examples + +[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an +event-driven, concurrent-unsafe Go wrapper for the proton-C library. The +[electron](http://godoc.org/qpid.apache.org/electron) package provides a more +Go-friendly concurrent API built on top of proton. -proton - [broker.go](proton/broker.go) a simple broker using the proton API +See [A Tale of Two Brokers](#a-tale-of-two-brokers) for a comparison of the two APIs. + ## Using the Go packages -If you have the proton C library and headers installed you can get the latest go +If you have the proton-C library and headers installed you can get the latest go packages with go get qpid.apache.org/electron -If proton is installed in a non-standard place (other than /usr or /usr/local) you -can set these environment variables before `go get`, for example: +If Proton-C is installed in a non-standard place (other than /usr or /usr/local) +you should set these environment variables before `go get`: export CGO_LDFLAGS="-L/<my-proton>/lib[64]" export CGO_CFLAGS="-I/<my-proton>/include" @@ -77,43 +75,57 @@ Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 messag The broker and clients use the standard AMQP port (5672) on the local host by default, to use a different address use the `-addr host:port` flag. -If you have the full proton repository checked out you can try try using the -python broker with Go clients: +If you have other Proton examples available you can try communicating between +programs in in different languages. For example use the python broker with Go +clients: python ../python/broker.py + go run send.go -count 10 localhost:/foo localhost:/bar Or use the Go broker and the python clients: + go run broker.go -debug python ../python/simple_send.py python ../python/simple_recv.py ## A tale of two brokers. -The `proton` and `electron` packages provide two alternate APIs for AMQP applications. -See [the proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) for a discussion -of why there are two APIs. - -The examples `proton/broker.go` and `electron/broker.go` both implement the same -simple broker-like functionality using each of the two APIs. They both handle -multiple connections concurrently and store messages on bounded queues -implemented by Go channels. - -However the `electron/broker` is less than half as long as the `proton/broker` -illustrating why it is better suited for most Go applications. - -`proton/broker` must explicitly handle proton events, which are processed in a -single goroutine per connection since proton is not concurrent safe. Each -connection uses channels to exchange messages between the event-handling -goroutine and the shared queues that are accessible to all connections. Sending -messages is particularly tricky since we must monitor the queue for available -messages and the sending link for available credit in order to send messages. - - -`electron/broker` takes advantage of the `electron` package, which hides all the -event handling and passing of messages between goroutines beind behind -straightforward interfaces for sending and receiving messages. The electron -broker can implement links as simple goroutines that loop popping messages from -a queue and sending them or receiving messages and pushing them to a queue. - +The [proton](http://godoc.org/qpid.apache.org/proton) and +[electron](http://godoc.org/qpid.apache.org/electron) packages provide two +different APIs for building AMQP applications. For most applications, +[electron](http://godoc.org/qpid.apache.org/electron) is easier to use. +[The proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) +has some discussion about why there are two APIs. + +The examples [proton/broker.go](proton/broker.go) and +[electron/broker.go](electron/broker.go) implement the same simple broker +functionality using each of the two APIs. They both handle multiple connections +concurrently and store messages on bounded queues implemented by Go channels. + +However the [electron/broker.go](electron/broker.go) is less than half as long as the +[proton/broker.go](proton/broker.go) illustrating why it is better suited for most Go +applications. + +[proton/broker.go](proton/broker.go) implements an event-driven loop per connection that reacts +to events like 'incoming link', 'incoming message' and 'sender has credit'. It +uses channels to exchange data between the event-loop goroutine for each +connection and shared queues that are accessible to all connections. Sending +messages is particularly tricky, the broker must monitor the queue for available +messages and the sender link for available credit. + + +[electron/broker.go](electron/broker.go) does not need any "upside-down" +event-driven code, it is implemented as straightforward loops. The broker is a +loop listening for connections. Each connection is a loop accepting for incoming +sender or recdiver links. Each receiving link is a loop that receives a message +and pushes it to a queue. Each sending link is a loop that pops a message from +a queue and sends it. + +Queue bounds and credit manage themselves: popping from a queue blocks till +there is a message, sending blocks until there is credit, receiving blocks till +something is received and pushing onto a queue blocks until there is +space. There's no need for code that monitors the state of multiple queues and +links. Each loop has one simple job to do, and the Go run-time schedules them +efficiently. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go index d7aedcc..d698838 100644 --- a/examples/go/electron/broker.go +++ b/examples/go/electron/broker.go @@ -55,7 +55,7 @@ func main() { flag.Parse() b := &broker{ queues: util.MakeQueues(*qsize), - container: electron.NewContainer(""), + container: electron.NewContainer(fmt.Sprintf("broker[%s]", os.Getpid())), acks: make(chan electron.Outcome), sent: make(chan sentMessage), } @@ -92,16 +92,11 @@ func (b *broker) run() error { // Start a goroutine for each new connections for { - conn, err := listener.Accept() + c, err := b.container.Accept(listener) if err != nil { util.Debugf("Accept error: %v", err) continue } - c, err := b.container.Connection(conn, electron.Server(), electron.AllowIncoming()) - if err != nil { - util.Debugf("Connection error: %v", err) - continue - } cc := &connection{b, c} go cc.run() // Handle the connection util.Debugf("Accepted %v", c) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go index 94ee509..7a505d8 100644 --- a/examples/go/electron/receive.go +++ b/examples/go/electron/receive.go @@ -24,9 +24,7 @@ import ( "flag" "fmt" "log" - "net" "os" - "path" "qpid.apache.org/amqp" "qpid.apache.org/electron" "sync" @@ -59,8 +57,7 @@ func main() { var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. wait.Add(len(urls)) // Wait for one goroutine per URL. - _, prog := path.Split(os.Args[0]) - container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid())) + container := electron.NewContainer(fmt.Sprintf("receive[%s]", os.Getpid())) connections := make(chan electron.Connection, len(urls)) // Connections to close on exit // Start a goroutine to for each URL to receive messages and send them to the messages channel. @@ -74,9 +71,8 @@ func main() { util.ExitIf(err) // Open a new connection - conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" + c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" util.ExitIf(err) - c, err := container.Connection(conn) connections <- c // Save connection so we can Close() when main() ends // Create a Receiver using the path of the URL as the source address http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/send.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go index 04ec2ef..4ea93ec 100644 --- a/examples/go/electron/send.go +++ b/examples/go/electron/send.go @@ -24,9 +24,7 @@ import ( "flag" "fmt" "log" - "net" "os" - "path" "qpid.apache.org/amqp" "qpid.apache.org/electron" "sync" @@ -58,8 +56,7 @@ func main() { var wait sync.WaitGroup wait.Add(len(urls)) // Wait for one goroutine per URL. - _, prog := path.Split(os.Args[0]) - container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid())) + container := electron.NewContainer(fmt.Sprintf("send[%s]", os.Getpid())) connections := make(chan electron.Connection, len(urls)) // Connctions to close on exit // Start a goroutine for each URL to send messages. @@ -72,9 +69,7 @@ func main() { util.ExitIf(err) // Open a new connection - conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - util.ExitIf(err) - c, err := container.Connection(conn) + c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" util.ExitIf(err) connections <- c // Save connection so we can Close() when main() ends http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/README.md b/proton-c/bindings/go/src/qpid.apache.org/README.md index 4b2da12..ffd67f8 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/README.md +++ b/proton-c/bindings/go/src/qpid.apache.org/README.md @@ -16,9 +16,10 @@ with goroutines and channels to build concurrent AMQP clients and servers. [qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an event-driven, concurrent-unsafe package that closely follows the proton C -API. Most Go programmers will find the electron package easier to use. +API. Most Go programmers will find the +[electron](http://godoc.org/qpid.apache.org/electron) package easier to use. -There are [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md) +See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md) to help you get started. Feedback is encouraged at: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go index 1d1287f..4ae36f4 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go +++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go @@ -344,3 +344,5 @@ func (m *message) Encode(buffer []byte) ([]byte, error) { } // TODO aconway 2015-09-14: Multi-section messages. + +// TODO aconway 2016-09-09: Message.String() use inspect. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index d0733f2..00c08ad 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -141,6 +141,12 @@ func AllowIncoming() ConnectionOption { return func(c *connection) { c.incoming = make(chan Incoming) } } +// Parent returns a ConnectionOption that associates the Connection with it's Container +// If not set a connection will create its own default container. +func Parent(cont Container) ConnectionOption { + return func(c *connection) { c.container = cont.(*container) } +} + type connection struct { endpoint connectionSettings @@ -158,10 +164,10 @@ type connection struct { defaultSession Session } -func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { +// NewConnection creates a connection with the given options. +func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection, error) { c := &connection{ - container: cont, - conn: conn, + conn: conn, } c.handler = newHandler(c) var err error @@ -170,10 +176,13 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) return nil, err } c.pConnection = c.engine.Connection() - c.pConnection.SetContainer(cont.Id()) for _, set := range setting { set(c) } + if c.container == nil { + c.container = NewContainer("").(*container) + } + c.pConnection.SetContainer(c.container.Id()) globalSASLInit(c.engine) c.endpoint.init(c.engine.String()) @@ -351,3 +360,21 @@ func globalSASLInit(eng *proton.Engine) { sasl.ConfigPath(globalSASLConfigDir) } } + +// Dial is shorthand for using net.Dial() then NewConnection() +func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) { + conn, err := net.Dial(network, addr) + if err == nil { + c, err = NewConnection(conn, opts...) + } + return +} + +// DialWithDialer is shorthand for using dialer.Dial() then NewConnection() +func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) { + conn, err := dialer.Dial(network, addr) + if err == nil { + c, err = NewConnection(conn, opts...) + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go index 1ab4df2..4cf5969 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go @@ -26,8 +26,11 @@ import ( "sync/atomic" ) -// Container is an AMQP container, it represents a single AMQP "application".It -// provides functions to create new Connections to remote containers. +// Container is an AMQP container, it represents a single AMQP "application" +// which can have multiple client or server connections. +// +// Each Container in a distributed AMQP application must have a unique +// container-id which is applied to its connections. // // Create with NewContainer() // @@ -35,18 +38,19 @@ type Container interface { // Id is a unique identifier for the container in your distributed application. Id() string - // Enable AMQP over the supplied net.Conn. Returns a Connection endpoint. - // - // For client connections (e.g. established with net.Dial()), you can start - // using the connection immediately. Connection.Incoming() is disabled by - // default for clients, pass an AllowIncoming() option to enable incoming - // sessions and links. - // - // For server connection (e.g. established with net.Listener.Accept()) you - // must pass the Server() option and receive from the Connection.Incoming() - // channel. The first Incoming value will be an *IncomingConnection that lets - // you examine the connection properties before Accept() or Reject() - Connection(net.Conn, ...ConnectionOption) (Connection, error) + // Connection creates a connection associated with this container. + Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) + + // Dial is shorthand for + // conn, err := net.Dial(); c, err := Connection(conn, opts...) + Dial(network string, addr string, opts ...ConnectionOption) (Connection, error) + + // Accept is shorthand for: + // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...) + Accept(l net.Listener, opts ...ConnectionOption) (Connection, error) + + // String returns Id() + String() string } type container struct { @@ -73,10 +77,26 @@ func NewContainer(id string) Container { func (cont *container) Id() string { return cont.id } +func (cont *container) String() string { return cont.Id() } + func (cont *container) nextLinkName() string { return cont.id + "@" + cont.nextTag() } -func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) { - return newConnection(conn, cont, setting...) +func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) { + return NewConnection(conn, append(opts, Parent(cont))...) +} + +func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) { + if conn, err := net.Dial(network, address); err == nil { + c, err = cont.Connection(conn, opts...) + } + return +} + +func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) { + if conn, err := l.Accept(); err == nil { + c, err = cont.Connection(conn, append(opts, Server())...) + } + return } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go index 207d8ba..436e5df 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go @@ -36,6 +36,16 @@ links to Send() and Receive() messages. Connection.Incoming() lets you accept incoming links opened by the remote peer. You can open and accept multiple links in both directions on a single Connection. +Some of the documentation examples show client and server side by side in a +single program, in separate goroutines. This is only for example purposes, real +AMQP applications would run in separate processes on the network. +More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md + +Some of the documentation examples show client and server side by side in a +single program, in separate goroutines. This is only for example purposes, real +AMQP applications would run in separate processes on the network. +More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md + */ package electron http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go new file mode 100644 index 0000000..93f275b --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go @@ -0,0 +1,81 @@ +package electron_test + +import ( + "fmt" + "net" + "qpid.apache.org/amqp" + "qpid.apache.org/electron" +) + +// Print errors +func check(msg string, err error) bool { + if err != nil { + fmt.Printf("%s: %s\n", msg, err) + } + return err == nil +} + +func runServer(cont electron.Container, l net.Listener) { + for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) { + go func() { // Process connections concurrently, accepting AMQP endpoints + for in := range c.Incoming() { + ep := in.Accept() // Accept all endpoints + go func() { // Process endpoints concurrently + switch ep := ep.(type) { + case electron.Sender: + m := amqp.NewMessageWith("hello yourself") + fmt.Printf("server %q sending %q\n", ep.Source(), m.Body()) + ep.SendForget(m) // One-way send, client does not need to Accept. + case electron.Receiver: + if rm, err := ep.Receive(); check("server receive", err) { + fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body()) + err := rm.Accept() // Client is waiting for Accept. + check("accept message", err) + } + } + }() + } + }() + } +} + +func startServer() (addr net.Addr) { + cont := electron.NewContainer("server") + if l, err := net.Listen("tcp", ""); check("listen", err) { + addr = l.Addr() + go runServer(cont, l) + } + return addr +} + +// Connect to addr and send/receive a message. +func client(addr net.Addr) { + if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) { + defer c.Close(nil) + if s, err := c.Sender(electron.Target("target")); check("sender", err) { + fmt.Printf("client sending\n") + s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept() + } + if r, err := c.Receiver(electron.Source("source")); check("receiver", err) { + if rm, err := r.Receive(); err == nil { + fmt.Printf("client received %q\n", rm.Message.Body()) + } + } + } +} + +// Example client and server communicating via AMQP over a TCP/IP connection. +// +// Normally client and server would be separate processes. +// For more realistic examples: +// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md +// +func Example_clientServer() { + addr := startServer() + client(addr) + // Output: + // client sending + // server "target" received "hello" + // server "source" sending "hello yourself" + // client received "hello yourself" +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go index 8badf35..f46fdc4 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go @@ -34,7 +34,7 @@ import ( // The result of sending a message is provided by an Outcome value. // // A sender can buffer messages up to the credit limit provided by the remote receiver. -// Send* methods will block if the buffer is full until there is space. +// All the Send* methods will block if the buffer is full until there is space. // Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. // type Sender interface { @@ -47,10 +47,14 @@ type Sender interface { // SendWaitable puts a message in the send buffer and returns a channel that // you can use to wait for the Outcome of just that message. The channel is - // buffered so you can receive from it whenever you want without blocking anything. + // buffered so you can receive from it whenever you want without blocking. + // + // Note: can block if there is no space to buffer the message. SendWaitable(m amqp.Message) <-chan Outcome // SendForget buffers a message for sending and returns, with no notification of the outcome. + // + // Note: can block if there is no space to buffer the message. SendForget(m amqp.Message) // SendAsync puts a message in the send buffer and returns immediately. An @@ -63,6 +67,8 @@ type Sender interface { // goroutines to avoid blocking the connection. // // If ack == nil no Outcome is sent. + // + // Note: can block if there is no space to buffer the message. SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go index 3303f0a..fa3e850 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -294,7 +294,7 @@ func (s Session) Receiver(name string) Link { // Unique (per process) string identifier for a connection, useful for debugging. func (c Connection) String() string { // Use the transport address to match the default transport logs from PN_TRACE. - return fmt.Sprintf("%p", c.Transport().CPtr()) + return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr()) } func (c Connection) Type() string { @@ -338,7 +338,7 @@ func (c Connection) SetPassword(password []byte) { } func (s Session) String() string { - return fmt.Sprintf("%s/%p", s.Connection(), s.pn) + return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: should print channel number. } func (s Session) Type() string { return "session" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org