This is an automated email from the ASF dual-hosted git repository. ccollins pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git
commit cf276ea9920ba3429e84648445981402acbd9abd Author: Christopher Collins <ccoll...@apache.org> AuthorDate: Wed Jul 12 18:33:41 2017 -0700 nmxact - Remove some mutexes --- nmxact/example/ble_loop/ble_loop.go | 12 ++- nmxact/example/ble_scan/ble_scan.go | 7 +- nmxact/nmble/ble_act.go | 4 +- nmxact/nmble/ble_fsm.go | 93 ++++++++++++++--------- nmxact/nmble/ble_oic_sesn.go | 145 +++++++++++------------------------- nmxact/nmble/ble_plain_sesn.go | 109 +++++++++++++-------------- nmxact/nmble/ble_xport.go | 110 +++++++++++++-------------- nmxact/nmble/discover.go | 2 +- nmxact/nmble/dispatch.go | 20 +++-- nmxact/nmble/receiver.go | 3 + nmxact/nmxutil/bcast.go | 43 +++++++++++ nmxact/nmxutil/err_funnel.go | 55 ++------------ 12 files changed, 284 insertions(+), 319 deletions(-) diff --git a/nmxact/example/ble_loop/ble_loop.go b/nmxact/example/ble_loop/ble_loop.go index 7a930a9..9e79d29 100644 --- a/nmxact/example/ble_loop/ble_loop.go +++ b/nmxact/example/ble_loop/ble_loop.go @@ -26,8 +26,11 @@ import ( "syscall" "time" + log "github.com/Sirupsen/logrus" + "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmble" + "mynewt.apache.org/newtmgr/nmxact/nmxutil" "mynewt.apache.org/newtmgr/nmxact/sesn" "mynewt.apache.org/newtmgr/nmxact/xact" "mynewt.apache.org/newtmgr/nmxact/xport" @@ -58,11 +61,14 @@ func configExitHandler(x xport.Xport, s sesn.Sesn) { } func main() { + //nmxutil.SetLogLevel(log.DebugLevel) + nmxutil.SetLogLevel(log.InfoLevel) + // Initialize the BLE transport. params := nmble.NewXportCfg() params.SockPath = "/tmp/blehostd-uds" - params.BlehostdPath = "blehostd.elf" - params.DevPath = "/dev/cu.usbmodem142111" + params.BlehostdPath = "blehostd" + params.DevPath = "/dev/cu.usbmodem142121" x, err := nmble.NewBleXport(params) if err != nil { @@ -83,7 +89,7 @@ func main() { // * Peer has name "nimble-bleprph" // * We use a random address. dev, err := nmble.DiscoverDeviceWithName( - x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "nimble-bleprph") + x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "c4") if err != nil { fmt.Fprintf(os.Stderr, "error discovering device: %s\n", err.Error()) os.Exit(1) diff --git a/nmxact/example/ble_scan/ble_scan.go b/nmxact/example/ble_scan/ble_scan.go index 73ca24f..3a01146 100644 --- a/nmxact/example/ble_scan/ble_scan.go +++ b/nmxact/example/ble_scan/ble_scan.go @@ -70,8 +70,8 @@ func main() { // Initialize the BLE transport. params := nmble.NewXportCfg() params.SockPath = "/tmp/blehostd-uds" - params.BlehostdPath = "blehostd.elf" - params.DevPath = "/dev/cu.usbmodem14221" + params.BlehostdPath = "blehostd" + params.DevPath = "/dev/cu.usbmodem142121" x, err := nmble.NewBleXport(params) if err != nil { @@ -105,9 +105,6 @@ func main() { for { sc := scan.BleOmpScanCfg(scanCb) - sc.Ble.ScanPred = func(adv bledefs.BleAdvReport) bool { - return adv.Fields.Name != nil && *adv.Fields.Name == "c5" - } if err := scanner.Start(sc); err != nil { fmt.Fprintf(os.Stderr, "error starting scan: %s\n", err.Error()) os.Exit(1) diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go index 2c4b71d..e51b8e9 100644 --- a/nmxact/nmble/ble_act.go +++ b/nmxact/nmble/ble_act.go @@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r *BleExchangeMtuReq) ( } } -func actScan(x *BleXport, bl *Listener, r *BleScanReq, - abortChan chan struct{}, advRptCb BleAdvRptFn) error { +func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{}, + advRptCb BleAdvRptFn) error { const rspType = MSG_TYPE_SCAN diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index 52b163e..44a7908 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -48,8 +48,11 @@ const ( FSM_DISCONNECT_TYPE_REQUESTED ) -type BleRxDataFn func(data []byte) -type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error) +type BleDisconnectEntry struct { + Dt BleFsmDisconnectType + Peer BleDev + Err error +} type BleFsmParamsCentral struct { PeerDev BleDev @@ -58,15 +61,13 @@ type BleFsmParamsCentral struct { } type BleFsmParams struct { - Bx *BleXport - OwnAddrType BleAddrType - EncryptWhen BleEncryptWhen - Central BleFsmParamsCentral - SvcUuids []BleUuid - ReqChrUuid BleUuid - RspChrUuid BleUuid - RxDataCb BleRxDataFn - DisconnectCb BleDisconnectFn + Bx *BleXport + OwnAddrType BleAddrType + EncryptWhen BleEncryptWhen + Central BleFsmParamsCentral + SvcUuids []BleUuid + ReqChrUuid BleUuid + RspChrUuid BleUuid } type BleFsm struct { @@ -83,8 +84,9 @@ type BleFsm struct { errFunnel nmxutil.ErrFunnel id uint32 - // Conveys changes in encrypted state. - encChan chan error + encBcast nmxutil.Bcaster + disconnectChan chan BleDisconnectEntry + rxNmpChan chan []byte } func NewBleFsm(p BleFsmParams) *BleFsm { @@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm { bf.errFunnel.AccumDelay = 250 * time.Millisecond bf.errFunnel.LessCb = fsmErrorLess - bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) } return bf } @@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType { } } -func (bf *BleFsm) processErr(err error) { +// Listens for an error in the state machine. On error, the session is +// considered disconnected and the error is reported to the client. +func (bf *BleFsm) listenForError() { + err := <-bf.errFunnel.Wait() + + // Stop listening for NMP responses. + close(bf.rxNmpChan) + // Remember some fields before we clear them. dt := calcDisconnectType(bf.state) @@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) { // Wait for all listeners to get removed. bf.rxer.WaitUntilNoListeners() - bf.errFunnel.Reset() - bf.params.DisconnectCb(dt, bf.peerDev, err) + bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err} + close(bf.disconnectChan) + + bf.disconnectChan = make(chan BleDisconnectEntry) } // Listens for events in the background. @@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) error { log.Debugf("Connection encrypted; conn_handle=%d", msg.ConnHandle) } - if bf.encChan != nil { - bf.encChan <- err - } + + // Notify any listeners of the encryption change event. + bf.encBcast.SendAndClear(err) case *BleDisconnectEvt: err := bf.disconnectError(msg.Reason) @@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error { if bf.nmpRspChr != nil && msg.AttrHandle == bf.nmpRspChr.ValHandle { - bf.params.RxDataCb(msg.Data.Bytes) + bf.rxNmpChan <- msg.Data.Bytes } } } @@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error { } defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq) - bf.encChan = make(chan error) - defer func() { bf.encChan = nil }() - // Initiate the encryption procedure. if err := encInitiate(bf.params.Bx, bl, r); err != nil { return err } // Block until the procedure completes. - return <-bf.encChan + itf := <-bf.encBcast.Listen() + return itf.(error) } func (bf *BleFsm) discAllChrs() error { @@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) { return false, nil } +func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry { + return bf.disconnectChan +} + +func (bf *BleFsm) RxNmpChan() <-chan []byte { + return bf.rxNmpChan +} + func (bf *BleFsm) startOnce() (bool, error) { if !bf.IsClosed() { return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf( @@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) { bf.state)) } - bf.errFunnel.Start() - for { retry, err := bf.executeState() if err != nil { bf.errFunnel.Insert(err) - err = bf.errFunnel.Wait() + err = <-bf.errFunnel.Wait() return retry, err } else if bf.state == SESN_STATE_DONE { + // We are fully connected. Listen for errors in the background. + go bf.listenForError() + return false, nil } } @@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) { func (bf *BleFsm) Start() error { var err error + bf.disconnectChan = make(chan BleDisconnectEntry) + bf.rxNmpChan = make(chan []byte) + for i := 0; i < bf.params.Central.ConnTries; i++ { var retry bool retry, err = bf.startOnce() @@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error { } } - return err + if err != nil { + return err + } + + return nil } // @return bool true if stop complete; // false if disconnect is now pending. -func (bf *BleFsm) Stop() (bool, error) { +func (bf *BleFsm) Stop() error { state := bf.state switch state { @@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) { SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING: - return false, - bf.closedError("Attempt to close an unopened BLE session") + return bf.closedError("Attempt to close an unopened BLE session") case SESN_STATE_CONNECTING: bf.connCancel() bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled")) - return false, nil + return nil default: if err := bf.terminate(); err != nil { - return false, err + return err } - return false, nil + return nil } } diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go index 9db1c84..e1bf28b 100644 --- a/nmxact/nmble/ble_oic_sesn.go +++ b/nmxact/nmble/ble_oic_sesn.go @@ -2,7 +2,6 @@ package nmble import ( "fmt" - "sync" "time" "github.com/runtimeco/go-coap" @@ -22,8 +21,7 @@ type BleOicSesn struct { closeTimeout time.Duration onCloseCb sesn.OnCloseFn - closeChan chan error - mtx sync.Mutex + closeChan chan struct{} } func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn { @@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn { ReqChrUuid: reqChrUuid, RspChrUuid: rspChrUuid, EncryptWhen: cfg.Ble.EncryptWhen, - RxDataCb: func(d []byte) { bos.onRxNmp(d) }, - DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) { - bos.onDisconnect(dt, p, e) - }, }) return bos } -// Returns true if a new channel was assigned. -func (bos *BleOicSesn) setCloseChan() error { - bos.mtx.Lock() - defer bos.mtx.Unlock() - - if bos.closeChan != nil { - return fmt.Errorf("Multiple listeners waiting for session to close") - } - - bos.closeChan = make(chan error, 1) - return nil -} - -func (bos *BleOicSesn) clearCloseChan() { - bos.mtx.Lock() - defer bos.mtx.Unlock() - - bos.closeChan = nil +func (bos *BleOicSesn) AbortRx(seq uint8) error { + return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted")) } -func (bos *BleOicSesn) listenForClose(timeout time.Duration) error { - select { - case <-bos.closeChan: - return nil - case <-time.After(timeout): - // Session never closed. - return fmt.Errorf("Timeout while waiting for session to close") - } -} +func (bos *BleOicSesn) Open() error { + // This channel gets closed when the session closes. + bos.closeChan = make(chan struct{}) -func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error { - if err := bos.setCloseChan(); err != nil { + if err := bos.bf.Start(); err != nil { + close(bos.closeChan) return err } - defer bos.clearCloseChan() - // If the session is already closed, we're done. - if bos.bf.IsClosed() { - return nil - } - - // Block until close completes or times out. - return bos.listenForClose(timeout) -} - -func (bos *BleOicSesn) AbortRx(seq uint8) error { - return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted")) -} - -func (bos *BleOicSesn) Open() error { d, err := omp.NewDispatcher(true, 3) if err != nil { + close(bos.closeChan) return err } bos.d = d - if err := bos.bf.Start(); err != nil { + // Listen for disconnect in the background. + go func() { + // Block until disconnect. + entry := <-bos.bf.DisconnectChan() + + // Signal error to all listeners. + bos.d.ErrorAll(entry.Err) bos.d.Stop() - return err - } + + // If the session is being closed, unblock the close() call. + close(bos.closeChan) + + // Only execute the client's disconnect callback if the disconnect was + // unsolicited. + if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb != nil { + bos.onCloseCb(bos, entry.Err) + } + }() + + // Listen for NMP responses in the background. + go func() { + for { + data, ok := <-bos.bf.RxNmpChan() + if !ok { + // Disconnected. + return + } else { + bos.d.Dispatch(data) + } + } + }() + return nil } func (bos *BleOicSesn) Close() error { - if err := bos.setCloseChan(); err != nil { - return err - } - defer bos.clearCloseChan() - - done, err := bos.bf.Stop() + err := bos.bf.Stop() if err != nil { return err } - if done { - // Close complete. - return nil - } - - // Block until close completes or times out. - return bos.listenForClose(bos.closeTimeout) + // Block until close completes. + <-bos.closeChan + return nil } func (bos *BleOicSesn) IsOpen() bool { return bos.bf.IsOpen() } -func (bos *BleOicSesn) onRxNmp(data []byte) { - bos.d.Dispatch(data) -} - -// Called by the FSM when a blehostd disconnect event is received. -func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev, - err error) { - - bos.d.ErrorAll(err) - - bos.mtx.Lock() - - // If the session is being closed, unblock the close() call. - if bos.closeChan != nil { - bos.closeChan <- err - } - - bos.mtx.Unlock() - - // Only stop the dispatcher and execute client's disconnect callback if the - // disconnect was unsolicited and the session was fully open. If the - // session wasn't fully open, the dispatcher will get stopped when the fsm - // start function returns an error (right after this function returns). - if dt == FSM_DISCONNECT_TYPE_OPENED || dt == FSM_DISCONNECT_TYPE_REQUESTED { - bos.d.Stop() - } - - if dt == FSM_DISCONNECT_TYPE_OPENED { - if bos.onCloseCb != nil { - bos.onCloseCb(bos, err) - } - } -} - func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) { return omp.EncodeOmpTcp(m) } diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go index b69d4f7..782fce0 100644 --- a/nmxact/nmble/ble_plain_sesn.go +++ b/nmxact/nmble/ble_plain_sesn.go @@ -16,25 +16,17 @@ type BlePlainSesn struct { closeTimeout time.Duration onCloseCb sesn.OnCloseFn - closeChan chan error + closeChan chan struct{} } func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn { bps := &BlePlainSesn{ - d: nmp.NewDispatcher(1), closeTimeout: cfg.Ble.CloseTimeout, onCloseCb: cfg.OnCloseCb, } - svcUuid, err := ParseUuid(NmpPlainSvcUuid) - if err != nil { - panic(err.Error()) - } - - chrUuid, err := ParseUuid(NmpPlainChrUuid) - if err != nil { - panic(err.Error()) - } + svcUuid, _ := ParseUuid(NmpPlainSvcUuid) + chrUuid, _ := ParseUuid(NmpPlainChrUuid) bps.bf = NewBleFsm(BleFsmParams{ Bx: bx, @@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn { ReqChrUuid: chrUuid, RspChrUuid: chrUuid, EncryptWhen: cfg.Ble.EncryptWhen, - RxDataCb: func(d []byte) { bps.onRxNmp(d) }, - DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) { - bps.onDisconnect(dt, p, e) - }, }) return bps } -func (bps *BlePlainSesn) setCloseChan() error { - if bps.closeChan != nil { - return fmt.Errorf("Multiple listeners waiting for session to close") - } - - bps.closeChan = make(chan error, 1) - return nil -} - -func (bps *BlePlainSesn) clearCloseChan() { - bps.closeChan = nil -} - -func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error { - select { - case <-bps.closeChan: - return nil - case <-time.After(timeout): - // Session never closed. - return fmt.Errorf("Timeout while waiting for session to close") - } -} - func (bps *BlePlainSesn) AbortRx(seq uint8) error { return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted")) } func (bps *BlePlainSesn) Open() error { - return bps.bf.Start() -} + // This channel gets closed when the session closes. + bps.closeChan = make(chan struct{}) -func (bps *BlePlainSesn) Close() error { - if err := bps.setCloseChan(); err != nil { + if err := bps.bf.Start(); err != nil { + close(bps.closeChan) return err } - defer bps.clearCloseChan() - done, err := bps.bf.Stop() + bps.d = nmp.NewDispatcher(3) + + // Listen for disconnect in the background. + go func() { + // Block until disconnect. + entry := <-bps.bf.DisconnectChan() + + // Signal error to all listeners. + bps.d.ErrorAll(entry.Err) + + // If the session is being closed, unblock the close() call. + close(bps.closeChan) + + // Only execute the client's disconnect callback if the disconnect was + // unsolicited. + if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb != nil { + bps.onCloseCb(bps, entry.Err) + } + }() + + // Listen for NMP responses in the background. + go func() { + for { + data, ok := <-bps.bf.RxNmpChan() + if !ok { + // Disconnected. + return + } else { + bps.d.Dispatch(data) + } + } + }() + + return nil +} + +func (bps *BlePlainSesn) Close() error { + err := bps.bf.Stop() if err != nil { return err } - if done { - // Close complete. - return nil - } - - // Block until close completes or times out. - return bps.listenForClose(bps.closeTimeout) + // Block until close completes. + <-bps.closeChan + return nil } func (bps *BlePlainSesn) IsOpen() bool { return bps.bf.IsOpen() } -func (bps *BlePlainSesn) onRxNmp(data []byte) { - bps.d.Dispatch(data) -} - // Called by the FSM when a blehostd disconnect event is received. func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev, err error) { bps.d.ErrorAll(err) - // If someone is waiting for the session to close, unblock them. - if bps.closeChan != nil { - bps.closeChan <- err - } + // If the session is being closed, unblock the close() call. + close(bps.closeChan) // Only execute client's disconnect callback if the disconnect was // unsolicited and the session was fully open. diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go index db0a8e3..67b2d60 100644 --- a/nmxact/nmble/ble_xport.go +++ b/nmxact/nmble/ble_xport.go @@ -84,19 +84,17 @@ const ( // Implements xport.Xport. type BleXport struct { - Bd *Dispatcher - client *unixchild.Client - state BleXportState - stopChan chan struct{} - numStopListeners int - shutdownChan chan bool - readyChan chan error - numReadyListeners int - master nmxutil.SingleResource - slave nmxutil.SingleResource - randAddr *BleAddr - mtx sync.Mutex - scanner *BleScanner + Bd *Dispatcher + client *unixchild.Client + state BleXportState + stopChan chan struct{} + shutdownChan chan bool + readyBcast nmxutil.Bcaster + master nmxutil.SingleResource + slave nmxutil.SingleResource + randAddr *BleAddr + stateMtx sync.Mutex + scanner *BleScanner cfg XportCfg } @@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) { bx := &BleXport{ Bd: NewDispatcher(), shutdownChan: make(chan bool), - readyChan: make(chan error), + readyBcast: nmxutil.Bcaster{}, master: nmxutil.NewSingleResource(), slave: nmxutil.NewSingleResource(), cfg: cfg, @@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) { return bx, nil } -func (bx *BleXport) createUnixChild() { +func (bx *BleXport) startUnixChild() error { config := unixchild.Config{ SockPath: bx.cfg.SockPath, ChildPath: bx.cfg.BlehostdPath, @@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() { } bx.client = unixchild.New(config) + + if err := bx.client.Start(); err != nil { + if unixchild.IsUcAcceptError(err) { + err = nmxutil.NewXportError( + "blehostd did not connect to socket; " + + "controller not attached?") + } else { + err = nmxutil.NewXportError( + "Failed to start child process: " + err.Error()) + } + return err + } + + return nil } func (bx *BleXport) BuildScanner() (scan.Scanner, error) { @@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) { log.Debugf("Shutting down BLE transport") - bx.mtx.Lock() + bx.stateMtx.Lock() var fullyStarted bool var already bool @@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) { bx.state = BLE_XPORT_STATE_STOPPING } - bx.mtx.Unlock() + bx.stateMtx.Unlock() if already { // Shutdown already in progress. @@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) { } // Stop all of this transport's go routines. - log.Debugf("Waiting for BLE transport goroutines to complete") - for i := 0; i < bx.numStopListeners; i++ { - bx.stopChan <- struct{}{} - } + close(bx.stopChan) // Stop the unixchild instance (blehostd + socket). if bx.client != nil { @@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) { } func (bx *BleXport) blockUntilReady() error { - bx.mtx.Lock() + var ch chan interface{} + + bx.stateMtx.Lock() switch bx.state { case BLE_XPORT_STATE_STARTED: // Already started; don't block. - bx.mtx.Unlock() + bx.stateMtx.Unlock() return nil case BLE_XPORT_STATE_DORMANT: // Not in the process of starting; the user will be waiting forever. - bx.mtx.Unlock() + bx.stateMtx.Unlock() return fmt.Errorf("Attempt to use BLE transport without starting it") default: + ch = bx.readyBcast.Listen() } + bx.stateMtx.Unlock() - bx.numReadyListeners++ - bx.mtx.Unlock() - - return <-bx.readyChan -} - -func (bx *BleXport) notifyReadyListeners(err error) { - for i := 0; i < bx.numReadyListeners; i++ { - bx.readyChan <- err - } - bx.numReadyListeners = 0 + itf := <-ch + return itf.(error) } func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { - bx.mtx.Lock() - defer bx.mtx.Unlock() + bx.stateMtx.Lock() + defer bx.stateMtx.Unlock() if bx.state != from { return false @@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { bx.state = to switch bx.state { case BLE_XPORT_STATE_STARTED: - bx.notifyReadyListeners(nil) + bx.readyBcast.SendAndClear(nil) case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT: - bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped")) + bx.readyBcast.SendAndClear( + nmxutil.NewXportError("BLE transport stopped")) default: } @@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error { return nmxutil.NewXportError("BLE xport started twice") } - bx.stopChan = make(chan struct{}) - bx.numStopListeners = 0 - - bx.createUnixChild() - if err := bx.client.Start(); err != nil { - if unixchild.IsUcAcceptError(err) { - err = nmxutil.NewXportError( - "blehostd did not connect to socket; " + - "controller not attached?") - } else { - err = nmxutil.NewXportError( - "Failed to start child process: " + err.Error()) - } + if err := bx.startUnixChild(); err != nil { bx.shutdown(true, err) return err } + bx.stopChan = make(chan struct{}) + // Listen for errors and data from the blehostd process. go func() { - bx.numStopListeners++ for { select { case err := <-bx.client.ErrChild: err = nmxutil.NewXportError("BLE transport error: " + err.Error()) - go bx.shutdown(true, err) + bx.shutdown(true, err) case buf := <-bx.client.FromChild: if len(buf) != 0 { @@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error { // Host and controller are synced. Listen for sync loss in the background. go func() { - bx.numStopListeners++ for { select { case err := <-bl.ErrChan: - go bx.shutdown(true, err) + bx.shutdown(true, err) + return case bm := <-bl.MsgChan: switch msg := bm.(type) { case *BleSyncEvt: if !msg.Synced { - go bx.shutdown(true, nmxutil.NewXportError( + bx.shutdown(true, nmxutil.NewXportError( "BLE host <-> controller sync lost")) } } @@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error { } }() - // Generate a new random address is none was specified. + // Generate a new random address if none was specified. if bx.randAddr == nil { addr, err := GenRandAddrXact(bx) if err != nil { diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go index 3858e75..2ac19ba 100644 --- a/nmxact/nmble/discover.go +++ b/nmxact/nmble/discover.go @@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error { return nmxutil.NewAlreadyError("Attempt to stop inactive discoverer") } - ch <- struct{}{} + close(ch) return nil } diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go index ba59f81..435b123 100644 --- a/nmxact/nmble/dispatch.go +++ b/nmxact/nmble/dispatch.go @@ -55,7 +55,7 @@ type Listener struct { func NewListener() *Listener { return &Listener{ MsgChan: make(chan Msg, 16), - ErrChan: make(chan error, 4), + ErrChan: make(chan error, 1), TmoChan: make(chan time.Time, 1), } } @@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan time.Time { return bl.TmoChan } -func (bl *Listener) Stop() { +func (bl *Listener) Close() { if bl.timer != nil { bl.timer.Stop() } + + close(bl.MsgChan) + close(bl.ErrChan) + close(bl.TmoChan) } type Dispatcher struct { @@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener { base, bl := d.findListener(base) if bl != nil { - bl.Stop() + bl.Close() if base.Seq != BLE_SEQ_NONE { delete(d.seqMap, base.Seq) } else { @@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) { cb := msgCtorMap[opTypePair] if cb == nil { return base, nil, fmt.Errorf( - "Unrecognized op+type pair:") // %s, %s", - //MsgOpToString(base.Op), MsgTypeToString(base.Type)) + "Unrecognized op+type pair: %s, %s", + MsgOpToString(base.Op), MsgTypeToString(base.Type)) } msg := cb() @@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) { } func (d *Dispatcher) ErrorAll(err error) { + if err == nil { + panic("NIL ERROR") + } + d.mtx.Lock() m1 := d.seqMap @@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) { for _, bl := range m1 { bl.ErrChan <- err + bl.Close() } for _, bl := range m2 { bl.ErrChan <- err + bl.Close() } } diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go index d089ba8..8660393 100644 --- a/nmxact/nmble/receiver.go +++ b/nmxact/nmble/receiver.go @@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) { } func (r *Receiver) ErrorAll(err error) { + if err == nil { + panic("NIL ERROR") + } r.mtx.Lock() defer r.mtx.Unlock() diff --git a/nmxact/nmxutil/bcast.go b/nmxact/nmxutil/bcast.go new file mode 100644 index 0000000..a7d68c0 --- /dev/null +++ b/nmxact/nmxutil/bcast.go @@ -0,0 +1,43 @@ +package nmxutil + +import ( + "sync" +) + +type Bcaster struct { + chs [](chan interface{}) + mtx sync.Mutex +} + +func (b *Bcaster) Listen() chan interface{} { + b.mtx.Lock() + defer b.mtx.Unlock() + + ch := make(chan interface{}) + b.chs = append(b.chs, ch) + + return ch +} + +func (b *Bcaster) Send(val interface{}) { + b.mtx.Lock() + chs := b.chs + b.mtx.Unlock() + + for _, ch := range chs { + ch <- val + close(ch) + } +} + +func (b *Bcaster) Clear() { + b.mtx.Lock() + defer b.mtx.Unlock() + + b.chs = nil +} + +func (b *Bcaster) SendAndClear(val interface{}) { + b.Send(val) + b.Clear() +} diff --git a/nmxact/nmxutil/err_funnel.go b/nmxact/nmxutil/err_funnel.go index 58b0ca2..6d6b5a3 100644 --- a/nmxact/nmxutil/err_funnel.go +++ b/nmxact/nmxutil/err_funnel.go @@ -1,7 +1,6 @@ package nmxutil import ( - "fmt" "sync" "time" ) @@ -13,26 +12,15 @@ type ErrProcFn func(err error) // reported. type ErrFunnel struct { LessCb ErrLessFn - ProcCb ErrProcFn AccumDelay time.Duration mtx sync.Mutex resetMtx sync.Mutex curErr error errTimer *time.Timer - started bool waiters [](chan error) } -func (f *ErrFunnel) Start() { - f.resetMtx.Lock() - - f.mtx.Lock() - defer f.mtx.Unlock() - - f.started = true -} - func (f *ErrFunnel) Insert(err error) { if err == nil { panic("ErrFunnel nil insert") @@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) { f.mtx.Lock() defer f.mtx.Unlock() - if !f.started { - panic("ErrFunnel insert without start") - } - if f.curErr == nil { f.curErr = err f.errTimer = time.AfterFunc(f.AccumDelay, func() { @@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) { } } -func (f *ErrFunnel) Reset() { - f.mtx.Lock() - defer f.mtx.Unlock() - - if f.started { - f.started = false - f.curErr = nil - f.errTimer.Stop() - f.resetMtx.Unlock() - } -} - func (f *ErrFunnel) timerExp() { f.mtx.Lock() @@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() { panic("ErrFunnel timer expired but no error") } - f.ProcCb(err) - for _, w := range waiters { w <- err + close(w) } } -func (f *ErrFunnel) Wait() error { - var err error - var c chan error +func (f *ErrFunnel) Wait() chan error { + c := make(chan error) f.mtx.Lock() - - if !f.started { - if f.curErr == nil { - err = fmt.Errorf("Wait on unstarted ErrFunnel") - } else { - err = f.curErr - } - } else { - c = make(chan error) - f.waiters = append(f.waiters, c) - } - + f.waiters = append(f.waiters, c) f.mtx.Unlock() - if err != nil { - return err - } else { - return <-c - } + return c } -- To stop receiving notification emails like this one, please contact "commits@mynewt.apache.org" <commits@mynewt.apache.org>.