Protocol state machine on receiving side

This commit is contained in:
Jakob Borg 2014-07-02 21:33:30 +02:00
parent 53898d2c60
commit 6ade27641d
1 changed files with 25 additions and 0 deletions

View File

@ -28,6 +28,12 @@ const (
messageTypeIndexUpdate = 6 messageTypeIndexUpdate = 6
) )
const (
stateInitial = iota
stateCCRcvd
stateIdxRcvd
)
const ( const (
FlagDeleted uint32 = 1 << 12 FlagDeleted uint32 = 1 << 12
FlagInvalid = 1 << 13 FlagInvalid = 1 << 13
@ -70,6 +76,7 @@ type Connection interface {
type rawConnection struct { type rawConnection struct {
id string id string
receiver Model receiver Model
state int
reader io.ReadCloser reader io.ReadCloser
cr *countingReader cr *countingReader
@ -116,6 +123,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
c := rawConnection{ c := rawConnection{
id: nodeID, id: nodeID,
receiver: nativeModel{receiver}, receiver: nativeModel{receiver},
state: stateInitial,
reader: flrd, reader: flrd,
cr: cr, cr: cr,
xr: xdr.NewReader(flrd), xr: xdr.NewReader(flrd),
@ -257,21 +265,34 @@ func (c *rawConnection) readerLoop() (err error) {
switch hdr.msgType { switch hdr.msgType {
case messageTypeIndex: case messageTypeIndex:
if c.state < stateCCRcvd {
return fmt.Errorf("protocol error: index message in state %d", c.state)
}
if err := c.handleIndex(); err != nil { if err := c.handleIndex(); err != nil {
return err return err
} }
c.state = stateIdxRcvd
case messageTypeIndexUpdate: case messageTypeIndexUpdate:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: index update message in state %d", c.state)
}
if err := c.handleIndexUpdate(); err != nil { if err := c.handleIndexUpdate(); err != nil {
return err return err
} }
case messageTypeRequest: case messageTypeRequest:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: request message in state %d", c.state)
}
if err := c.handleRequest(hdr); err != nil { if err := c.handleRequest(hdr); err != nil {
return err return err
} }
case messageTypeResponse: case messageTypeResponse:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: response message in state %d", c.state)
}
if err := c.handleResponse(hdr); err != nil { if err := c.handleResponse(hdr); err != nil {
return err return err
} }
@ -283,9 +304,13 @@ func (c *rawConnection) readerLoop() (err error) {
c.handlePong(hdr) c.handlePong(hdr)
case messageTypeClusterConfig: case messageTypeClusterConfig:
if c.state != stateInitial {
return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
}
if err := c.handleClusterConfig(); err != nil { if err := c.handleClusterConfig(); err != nil {
return err return err
} }
c.state = stateCCRcvd
default: default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)