diff --git a/lib/connections/config.go b/lib/connections/config.go index 837c181fa..7824ae7b8 100644 --- a/lib/connections/config.go +++ b/lib/connections/config.go @@ -7,10 +7,9 @@ package connections import ( - "io/ioutil" "time" - "github.com/hashicorp/yamux" + "github.com/xtaci/smux" ) const ( @@ -25,12 +24,10 @@ const ( ) var ( - yamuxConfig = &yamux.Config{ - AcceptBacklog: 256, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: 256 * 1024, - LogOutput: ioutil.Discard, + smuxConfig = &smux.Config{ + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 4096, + MaxReceiveBuffer: 4 * 1024 * 1024, } ) diff --git a/lib/connections/kcp_dial.go b/lib/connections/kcp_dial.go index 702b433e8..280641c95 100644 --- a/lib/connections/kcp_dial.go +++ b/lib/connections/kcp_dial.go @@ -11,10 +11,10 @@ import ( "net/url" "time" - "github.com/hashicorp/yamux" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/protocol" "github.com/xtaci/kcp-go" + "github.com/xtaci/smux" ) func init() { @@ -56,7 +56,7 @@ func (d *kcpDialer) Dial(id protocol.DeviceID, uri *url.URL) (internalConn, erro conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize) conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl)) - ses, err := yamux.Client(conn, yamuxConfig) + ses, err := smux.Client(conn, smuxConfig) if err != nil { conn.Close() return internalConn{}, err diff --git a/lib/connections/kcp_listen.go b/lib/connections/kcp_listen.go index 5b44ca39e..99cbf7b7a 100644 --- a/lib/connections/kcp_listen.go +++ b/lib/connections/kcp_listen.go @@ -16,10 +16,10 @@ import ( "github.com/AudriusButkevicius/pfilter" "github.com/ccding/go-stun/stun" - "github.com/hashicorp/yamux" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/nat" "github.com/xtaci/kcp-go" + "github.com/xtaci/smux" ) func init() { @@ -116,16 +116,16 @@ func (t *kcpListener) Serve() { l.Debugln("connect from", conn.RemoteAddr()) - ses, err := yamux.Server(conn, yamuxConfig) + ses, err := smux.Server(conn, smuxConfig) if err != nil { - l.Debugln("yamux server:", err) + l.Debugln("smux server:", err) conn.Close() continue } stream, err := ses.AcceptStream() if err != nil { - l.Debugln("yamux accept:", err) + l.Debugln("smux accept:", err) ses.Close() continue } diff --git a/lib/connections/kcp_misc.go b/lib/connections/kcp_misc.go index 0e50acff7..c11cfe957 100644 --- a/lib/connections/kcp_misc.go +++ b/lib/connections/kcp_misc.go @@ -16,7 +16,7 @@ import ( "time" "github.com/AudriusButkevicius/pfilter" - "github.com/hashicorp/yamux" + "github.com/xtaci/smux" ) var ( @@ -162,8 +162,8 @@ func (f *stunFilter) reap() { } type sessionClosingStream struct { - *yamux.Stream - session *yamux.Session + *smux.Stream + session *smux.Session } func (w *sessionClosingStream) Close() error { diff --git a/vendor/github.com/hashicorp/yamux/LICENSE b/vendor/github.com/hashicorp/yamux/LICENSE deleted file mode 100644 index f0e5c79e1..000000000 --- a/vendor/github.com/hashicorp/yamux/LICENSE +++ /dev/null @@ -1,362 +0,0 @@ -Mozilla Public License, version 2.0 - -1. Definitions - -1.1. "Contributor" - - means each individual or legal entity that creates, contributes to the - creation of, or owns Covered Software. - -1.2. "Contributor Version" - - means the combination of the Contributions of others (if any) used by a - Contributor and that particular Contributor's Contribution. - -1.3. "Contribution" - - means Covered Software of a particular Contributor. - -1.4. "Covered Software" - - means Source Code Form to which the initial Contributor has attached the - notice in Exhibit A, the Executable Form of such Source Code Form, and - Modifications of such Source Code Form, in each case including portions - thereof. - -1.5. "Incompatible With Secondary Licenses" - means - - a. that the initial Contributor has attached the notice described in - Exhibit B to the Covered Software; or - - b. that the Covered Software was made available under the terms of - version 1.1 or earlier of the License, but not also under the terms of - a Secondary License. - -1.6. "Executable Form" - - means any form of the work other than Source Code Form. - -1.7. "Larger Work" - - means a work that combines Covered Software with other material, in a - separate file or files, that is not Covered Software. - -1.8. "License" - - means this document. - -1.9. "Licensable" - - means having the right to grant, to the maximum extent possible, whether - at the time of the initial grant or subsequently, any and all of the - rights conveyed by this License. - -1.10. "Modifications" - - means any of the following: - - a. any file in Source Code Form that results from an addition to, - deletion from, or modification of the contents of Covered Software; or - - b. any new file in Source Code Form that contains any Covered Software. - -1.11. "Patent Claims" of a Contributor - - means any patent claim(s), including without limitation, method, - process, and apparatus claims, in any patent Licensable by such - Contributor that would be infringed, but for the grant of the License, - by the making, using, selling, offering for sale, having made, import, - or transfer of either its Contributions or its Contributor Version. - -1.12. "Secondary License" - - means either the GNU General Public License, Version 2.0, the GNU Lesser - General Public License, Version 2.1, the GNU Affero General Public - License, Version 3.0, or any later versions of those licenses. - -1.13. "Source Code Form" - - means the form of the work preferred for making modifications. - -1.14. "You" (or "Your") - - means an individual or a legal entity exercising rights under this - License. For legal entities, "You" includes any entity that controls, is - controlled by, or is under common control with You. For purposes of this - definition, "control" means (a) the power, direct or indirect, to cause - the direction or management of such entity, whether by contract or - otherwise, or (b) ownership of more than fifty percent (50%) of the - outstanding shares or beneficial ownership of such entity. - - -2. License Grants and Conditions - -2.1. Grants - - Each Contributor hereby grants You a world-wide, royalty-free, - non-exclusive license: - - a. under intellectual property rights (other than patent or trademark) - Licensable by such Contributor to use, reproduce, make available, - modify, display, perform, distribute, and otherwise exploit its - Contributions, either on an unmodified basis, with Modifications, or - as part of a Larger Work; and - - b. under Patent Claims of such Contributor to make, use, sell, offer for - sale, have made, import, and otherwise transfer either its - Contributions or its Contributor Version. - -2.2. Effective Date - - The licenses granted in Section 2.1 with respect to any Contribution - become effective for each Contribution on the date the Contributor first - distributes such Contribution. - -2.3. Limitations on Grant Scope - - The licenses granted in this Section 2 are the only rights granted under - this License. No additional rights or licenses will be implied from the - distribution or licensing of Covered Software under this License. - Notwithstanding Section 2.1(b) above, no patent license is granted by a - Contributor: - - a. for any code that a Contributor has removed from Covered Software; or - - b. for infringements caused by: (i) Your and any other third party's - modifications of Covered Software, or (ii) the combination of its - Contributions with other software (except as part of its Contributor - Version); or - - c. under Patent Claims infringed by Covered Software in the absence of - its Contributions. - - This License does not grant any rights in the trademarks, service marks, - or logos of any Contributor (except as may be necessary to comply with - the notice requirements in Section 3.4). - -2.4. Subsequent Licenses - - No Contributor makes additional grants as a result of Your choice to - distribute the Covered Software under a subsequent version of this - License (see Section 10.2) or under the terms of a Secondary License (if - permitted under the terms of Section 3.3). - -2.5. Representation - - Each Contributor represents that the Contributor believes its - Contributions are its original creation(s) or it has sufficient rights to - grant the rights to its Contributions conveyed by this License. - -2.6. Fair Use - - This License is not intended to limit any rights You have under - applicable copyright doctrines of fair use, fair dealing, or other - equivalents. - -2.7. Conditions - - Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in - Section 2.1. - - -3. Responsibilities - -3.1. Distribution of Source Form - - All distribution of Covered Software in Source Code Form, including any - Modifications that You create or to which You contribute, must be under - the terms of this License. You must inform recipients that the Source - Code Form of the Covered Software is governed by the terms of this - License, and how they can obtain a copy of this License. You may not - attempt to alter or restrict the recipients' rights in the Source Code - Form. - -3.2. Distribution of Executable Form - - If You distribute Covered Software in Executable Form then: - - a. such Covered Software must also be made available in Source Code Form, - as described in Section 3.1, and You must inform recipients of the - Executable Form how they can obtain a copy of such Source Code Form by - reasonable means in a timely manner, at a charge no more than the cost - of distribution to the recipient; and - - b. You may distribute such Executable Form under the terms of this - License, or sublicense it under different terms, provided that the - license for the Executable Form does not attempt to limit or alter the - recipients' rights in the Source Code Form under this License. - -3.3. Distribution of a Larger Work - - You may create and distribute a Larger Work under terms of Your choice, - provided that You also comply with the requirements of this License for - the Covered Software. If the Larger Work is a combination of Covered - Software with a work governed by one or more Secondary Licenses, and the - Covered Software is not Incompatible With Secondary Licenses, this - License permits You to additionally distribute such Covered Software - under the terms of such Secondary License(s), so that the recipient of - the Larger Work may, at their option, further distribute the Covered - Software under the terms of either this License or such Secondary - License(s). - -3.4. Notices - - You may not remove or alter the substance of any license notices - (including copyright notices, patent notices, disclaimers of warranty, or - limitations of liability) contained within the Source Code Form of the - Covered Software, except that You may alter any license notices to the - extent required to remedy known factual inaccuracies. - -3.5. Application of Additional Terms - - You may choose to offer, and to charge a fee for, warranty, support, - indemnity or liability obligations to one or more recipients of Covered - Software. However, You may do so only on Your own behalf, and not on - behalf of any Contributor. You must make it absolutely clear that any - such warranty, support, indemnity, or liability obligation is offered by - You alone, and You hereby agree to indemnify every Contributor for any - liability incurred by such Contributor as a result of warranty, support, - indemnity or liability terms You offer. You may include additional - disclaimers of warranty and limitations of liability specific to any - jurisdiction. - -4. Inability to Comply Due to Statute or Regulation - - If it is impossible for You to comply with any of the terms of this License - with respect to some or all of the Covered Software due to statute, - judicial order, or regulation then You must: (a) comply with the terms of - this License to the maximum extent possible; and (b) describe the - limitations and the code they affect. Such description must be placed in a - text file included with all distributions of the Covered Software under - this License. Except to the extent prohibited by statute or regulation, - such description must be sufficiently detailed for a recipient of ordinary - skill to be able to understand it. - -5. Termination - -5.1. The rights granted under this License will terminate automatically if You - fail to comply with any of its terms. However, if You become compliant, - then the rights granted under this License from a particular Contributor - are reinstated (a) provisionally, unless and until such Contributor - explicitly and finally terminates Your grants, and (b) on an ongoing - basis, if such Contributor fails to notify You of the non-compliance by - some reasonable means prior to 60 days after You have come back into - compliance. Moreover, Your grants from a particular Contributor are - reinstated on an ongoing basis if such Contributor notifies You of the - non-compliance by some reasonable means, this is the first time You have - received notice of non-compliance with this License from such - Contributor, and You become compliant prior to 30 days after Your receipt - of the notice. - -5.2. If You initiate litigation against any entity by asserting a patent - infringement claim (excluding declaratory judgment actions, - counter-claims, and cross-claims) alleging that a Contributor Version - directly or indirectly infringes any patent, then the rights granted to - You by any and all Contributors for the Covered Software under Section - 2.1 of this License shall terminate. - -5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user - license agreements (excluding distributors and resellers) which have been - validly granted by You or Your distributors under this License prior to - termination shall survive termination. - -6. Disclaimer of Warranty - - Covered Software is provided under this License on an "as is" basis, - without warranty of any kind, either expressed, implied, or statutory, - including, without limitation, warranties that the Covered Software is free - of defects, merchantable, fit for a particular purpose or non-infringing. - The entire risk as to the quality and performance of the Covered Software - is with You. Should any Covered Software prove defective in any respect, - You (not any Contributor) assume the cost of any necessary servicing, - repair, or correction. This disclaimer of warranty constitutes an essential - part of this License. No use of any Covered Software is authorized under - this License except under this disclaimer. - -7. Limitation of Liability - - Under no circumstances and under no legal theory, whether tort (including - negligence), contract, or otherwise, shall any Contributor, or anyone who - distributes Covered Software as permitted above, be liable to You for any - direct, indirect, special, incidental, or consequential damages of any - character including, without limitation, damages for lost profits, loss of - goodwill, work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses, even if such party shall have been - informed of the possibility of such damages. This limitation of liability - shall not apply to liability for death or personal injury resulting from - such party's negligence to the extent applicable law prohibits such - limitation. Some jurisdictions do not allow the exclusion or limitation of - incidental or consequential damages, so this exclusion and limitation may - not apply to You. - -8. Litigation - - Any litigation relating to this License may be brought only in the courts - of a jurisdiction where the defendant maintains its principal place of - business and such litigation shall be governed by laws of that - jurisdiction, without reference to its conflict-of-law provisions. Nothing - in this Section shall prevent a party's ability to bring cross-claims or - counter-claims. - -9. Miscellaneous - - This License represents the complete agreement concerning the subject - matter hereof. If any provision of this License is held to be - unenforceable, such provision shall be reformed only to the extent - necessary to make it enforceable. Any law or regulation which provides that - the language of a contract shall be construed against the drafter shall not - be used to construe this License against a Contributor. - - -10. Versions of the License - -10.1. New Versions - - Mozilla Foundation is the license steward. Except as provided in Section - 10.3, no one other than the license steward has the right to modify or - publish new versions of this License. Each version will be given a - distinguishing version number. - -10.2. Effect of New Versions - - You may distribute the Covered Software under the terms of the version - of the License under which You originally received the Covered Software, - or under the terms of any subsequent version published by the license - steward. - -10.3. Modified Versions - - If you create software not governed by this License, and you want to - create a new license for such software, you may create and use a - modified version of this License if you rename the license and remove - any references to the name of the license steward (except to note that - such modified license differs from this License). - -10.4. Distributing Source Code Form that is Incompatible With Secondary - Licenses If You choose to distribute Source Code Form that is - Incompatible With Secondary Licenses under the terms of this version of - the License, the notice described in Exhibit B of this License must be - attached. - -Exhibit A - Source Code Form License Notice - - This Source Code Form is subject to the - terms of the Mozilla Public License, v. - 2.0. If a copy of the MPL was not - distributed with this file, You can - obtain one at - http://mozilla.org/MPL/2.0/. - -If it is not possible or desirable to put the notice in a particular file, -then You may include the notice in a location (such as a LICENSE file in a -relevant directory) where a recipient would be likely to look for such a -notice. - -You may add additional accurate notices of copyright ownership. - -Exhibit B - "Incompatible With Secondary Licenses" Notice - - This Source Code Form is "Incompatible - With Secondary Licenses", as defined by - the Mozilla Public License, v. 2.0. \ No newline at end of file diff --git a/vendor/github.com/hashicorp/yamux/addr.go b/vendor/github.com/hashicorp/yamux/addr.go deleted file mode 100644 index be6ebca9c..000000000 --- a/vendor/github.com/hashicorp/yamux/addr.go +++ /dev/null @@ -1,60 +0,0 @@ -package yamux - -import ( - "fmt" - "net" -) - -// hasAddr is used to get the address from the underlying connection -type hasAddr interface { - LocalAddr() net.Addr - RemoteAddr() net.Addr -} - -// yamuxAddr is used when we cannot get the underlying address -type yamuxAddr struct { - Addr string -} - -func (*yamuxAddr) Network() string { - return "yamux" -} - -func (y *yamuxAddr) String() string { - return fmt.Sprintf("yamux:%s", y.Addr) -} - -// Addr is used to get the address of the listener. -func (s *Session) Addr() net.Addr { - return s.LocalAddr() -} - -// LocalAddr is used to get the local address of the -// underlying connection. -func (s *Session) LocalAddr() net.Addr { - addr, ok := s.conn.(hasAddr) - if !ok { - return &yamuxAddr{"local"} - } - return addr.LocalAddr() -} - -// RemoteAddr is used to get the address of remote end -// of the underlying connection -func (s *Session) RemoteAddr() net.Addr { - addr, ok := s.conn.(hasAddr) - if !ok { - return &yamuxAddr{"remote"} - } - return addr.RemoteAddr() -} - -// LocalAddr returns the local address -func (s *Stream) LocalAddr() net.Addr { - return s.session.LocalAddr() -} - -// LocalAddr returns the remote address -func (s *Stream) RemoteAddr() net.Addr { - return s.session.RemoteAddr() -} diff --git a/vendor/github.com/hashicorp/yamux/const.go b/vendor/github.com/hashicorp/yamux/const.go deleted file mode 100644 index 4f5293828..000000000 --- a/vendor/github.com/hashicorp/yamux/const.go +++ /dev/null @@ -1,157 +0,0 @@ -package yamux - -import ( - "encoding/binary" - "fmt" -) - -var ( - // ErrInvalidVersion means we received a frame with an - // invalid version - ErrInvalidVersion = fmt.Errorf("invalid protocol version") - - // ErrInvalidMsgType means we received a frame with an - // invalid message type - ErrInvalidMsgType = fmt.Errorf("invalid msg type") - - // ErrSessionShutdown is used if there is a shutdown during - // an operation - ErrSessionShutdown = fmt.Errorf("session shutdown") - - // ErrStreamsExhausted is returned if we have no more - // stream ids to issue - ErrStreamsExhausted = fmt.Errorf("streams exhausted") - - // ErrDuplicateStream is used if a duplicate stream is - // opened inbound - ErrDuplicateStream = fmt.Errorf("duplicate stream initiated") - - // ErrReceiveWindowExceeded indicates the window was exceeded - ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded") - - // ErrTimeout is used when we reach an IO deadline - ErrTimeout = fmt.Errorf("i/o deadline reached") - - // ErrStreamClosed is returned when using a closed stream - ErrStreamClosed = fmt.Errorf("stream closed") - - // ErrUnexpectedFlag is set when we get an unexpected flag - ErrUnexpectedFlag = fmt.Errorf("unexpected flag") - - // ErrRemoteGoAway is used when we get a go away from the other side - ErrRemoteGoAway = fmt.Errorf("remote end is not accepting connections") - - // ErrConnectionReset is sent if a stream is reset. This can happen - // if the backlog is exceeded, or if there was a remote GoAway. - ErrConnectionReset = fmt.Errorf("connection reset") - - // ErrConnectionWriteTimeout indicates that we hit the "safety valve" - // timeout writing to the underlying stream connection. - ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout") - - // ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close - ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout") -) - -const ( - // protoVersion is the only version we support - protoVersion uint8 = 0 -) - -const ( - // Data is used for data frames. They are followed - // by length bytes worth of payload. - typeData uint8 = iota - - // WindowUpdate is used to change the window of - // a given stream. The length indicates the delta - // update to the window. - typeWindowUpdate - - // Ping is sent as a keep-alive or to measure - // the RTT. The StreamID and Length value are echoed - // back in the response. - typePing - - // GoAway is sent to terminate a session. The StreamID - // should be 0 and the length is an error code. - typeGoAway -) - -const ( - // SYN is sent to signal a new stream. May - // be sent with a data payload - flagSYN uint16 = 1 << iota - - // ACK is sent to acknowledge a new stream. May - // be sent with a data payload - flagACK - - // FIN is sent to half-close the given stream. - // May be sent with a data payload. - flagFIN - - // RST is used to hard close a given stream. - flagRST -) - -const ( - // initialStreamWindow is the initial stream window size - initialStreamWindow uint32 = 256 * 1024 -) - -const ( - // goAwayNormal is sent on a normal termination - goAwayNormal uint32 = iota - - // goAwayProtoErr sent on a protocol error - goAwayProtoErr - - // goAwayInternalErr sent on an internal error - goAwayInternalErr -) - -const ( - sizeOfVersion = 1 - sizeOfType = 1 - sizeOfFlags = 2 - sizeOfStreamID = 4 - sizeOfLength = 4 - headerSize = sizeOfVersion + sizeOfType + sizeOfFlags + - sizeOfStreamID + sizeOfLength -) - -type header []byte - -func (h header) Version() uint8 { - return h[0] -} - -func (h header) MsgType() uint8 { - return h[1] -} - -func (h header) Flags() uint16 { - return binary.BigEndian.Uint16(h[2:4]) -} - -func (h header) StreamID() uint32 { - return binary.BigEndian.Uint32(h[4:8]) -} - -func (h header) Length() uint32 { - return binary.BigEndian.Uint32(h[8:12]) -} - -func (h header) String() string { - return fmt.Sprintf("Vsn:%d Type:%d Flags:%d StreamID:%d Length:%d", - h.Version(), h.MsgType(), h.Flags(), h.StreamID(), h.Length()) -} - -func (h header) encode(msgType uint8, flags uint16, streamID uint32, length uint32) { - h[0] = protoVersion - h[1] = msgType - binary.BigEndian.PutUint16(h[2:4], flags) - binary.BigEndian.PutUint32(h[4:8], streamID) - binary.BigEndian.PutUint32(h[8:12], length) -} diff --git a/vendor/github.com/hashicorp/yamux/mux.go b/vendor/github.com/hashicorp/yamux/mux.go deleted file mode 100644 index 7abc7c744..000000000 --- a/vendor/github.com/hashicorp/yamux/mux.go +++ /dev/null @@ -1,87 +0,0 @@ -package yamux - -import ( - "fmt" - "io" - "os" - "time" -) - -// Config is used to tune the Yamux session -type Config struct { - // AcceptBacklog is used to limit how many streams may be - // waiting an accept. - AcceptBacklog int - - // EnableKeepalive is used to do a period keep alive - // messages using a ping. - EnableKeepAlive bool - - // KeepAliveInterval is how often to perform the keep alive - KeepAliveInterval time.Duration - - // ConnectionWriteTimeout is meant to be a "safety valve" timeout after - // we which will suspect a problem with the underlying connection and - // close it. This is only applied to writes, where's there's generally - // an expectation that things will move along quickly. - ConnectionWriteTimeout time.Duration - - // MaxStreamWindowSize is used to control the maximum - // window size that we allow for a stream. - MaxStreamWindowSize uint32 - - // LogOutput is used to control the log destination - LogOutput io.Writer -} - -// DefaultConfig is used to return a default configuration -func DefaultConfig() *Config { - return &Config{ - AcceptBacklog: 256, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: initialStreamWindow, - LogOutput: os.Stderr, - } -} - -// VerifyConfig is used to verify the sanity of configuration -func VerifyConfig(config *Config) error { - if config.AcceptBacklog <= 0 { - return fmt.Errorf("backlog must be positive") - } - if config.KeepAliveInterval == 0 { - return fmt.Errorf("keep-alive interval must be positive") - } - if config.MaxStreamWindowSize < initialStreamWindow { - return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow) - } - return nil -} - -// Server is used to initialize a new server-side connection. -// There must be at most one server-side connection. If a nil config is -// provided, the DefaultConfiguration will be used. -func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { - if config == nil { - config = DefaultConfig() - } - if err := VerifyConfig(config); err != nil { - return nil, err - } - return newSession(config, conn, false), nil -} - -// Client is used to initialize a new client-side connection. -// There must be at most one client-side connection. -func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { - if config == nil { - config = DefaultConfig() - } - - if err := VerifyConfig(config); err != nil { - return nil, err - } - return newSession(config, conn, true), nil -} diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go deleted file mode 100644 index e17981839..000000000 --- a/vendor/github.com/hashicorp/yamux/session.go +++ /dev/null @@ -1,623 +0,0 @@ -package yamux - -import ( - "bufio" - "fmt" - "io" - "io/ioutil" - "log" - "math" - "net" - "strings" - "sync" - "sync/atomic" - "time" -) - -// Session is used to wrap a reliable ordered connection and to -// multiplex it into multiple streams. -type Session struct { - // remoteGoAway indicates the remote side does - // not want futher connections. Must be first for alignment. - remoteGoAway int32 - - // localGoAway indicates that we should stop - // accepting futher connections. Must be first for alignment. - localGoAway int32 - - // nextStreamID is the next stream we should - // send. This depends if we are a client/server. - nextStreamID uint32 - - // config holds our configuration - config *Config - - // logger is used for our logs - logger *log.Logger - - // conn is the underlying connection - conn io.ReadWriteCloser - - // bufRead is a buffered reader - bufRead *bufio.Reader - - // pings is used to track inflight pings - pings map[uint32]chan struct{} - pingID uint32 - pingLock sync.Mutex - - // streams maps a stream id to a stream, and inflight has an entry - // for any outgoing stream that has not yet been established. Both are - // protected by streamLock. - streams map[uint32]*Stream - inflight map[uint32]struct{} - streamLock sync.Mutex - - // synCh acts like a semaphore. It is sized to the AcceptBacklog which - // is assumed to be symmetric between the client and server. This allows - // the client to avoid exceeding the backlog and instead blocks the open. - synCh chan struct{} - - // acceptCh is used to pass ready streams to the client - acceptCh chan *Stream - - // sendCh is used to mark a stream as ready to send, - // or to send a header out directly. - sendCh chan sendReady - - // recvDoneCh is closed when recv() exits to avoid a race - // between stream registration and stream shutdown - recvDoneCh chan struct{} - - // shutdown is used to safely close a session - shutdown bool - shutdownErr error - shutdownCh chan struct{} - shutdownLock sync.Mutex -} - -// sendReady is used to either mark a stream as ready -// or to directly send a header -type sendReady struct { - Hdr []byte - Body io.Reader - Err chan error -} - -// newSession is used to construct a new session -func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { - s := &Session{ - config: config, - logger: log.New(config.LogOutput, "", log.LstdFlags), - conn: conn, - bufRead: bufio.NewReader(conn), - pings: make(map[uint32]chan struct{}), - streams: make(map[uint32]*Stream), - inflight: make(map[uint32]struct{}), - synCh: make(chan struct{}, config.AcceptBacklog), - acceptCh: make(chan *Stream, config.AcceptBacklog), - sendCh: make(chan sendReady, 64), - recvDoneCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - } - if client { - s.nextStreamID = 1 - } else { - s.nextStreamID = 2 - } - go s.recv() - go s.send() - if config.EnableKeepAlive { - go s.keepalive() - } - return s -} - -// IsClosed does a safe check to see if we have shutdown -func (s *Session) IsClosed() bool { - select { - case <-s.shutdownCh: - return true - default: - return false - } -} - -// NumStreams returns the number of currently open streams -func (s *Session) NumStreams() int { - s.streamLock.Lock() - num := len(s.streams) - s.streamLock.Unlock() - return num -} - -// Open is used to create a new stream as a net.Conn -func (s *Session) Open() (net.Conn, error) { - conn, err := s.OpenStream() - if err != nil { - return nil, err - } - return conn, nil -} - -// OpenStream is used to create a new stream -func (s *Session) OpenStream() (*Stream, error) { - if s.IsClosed() { - return nil, ErrSessionShutdown - } - if atomic.LoadInt32(&s.remoteGoAway) == 1 { - return nil, ErrRemoteGoAway - } - - // Block if we have too many inflight SYNs - select { - case s.synCh <- struct{}{}: - case <-s.shutdownCh: - return nil, ErrSessionShutdown - } - -GET_ID: - // Get an ID, and check for stream exhaustion - id := atomic.LoadUint32(&s.nextStreamID) - if id >= math.MaxUint32-1 { - return nil, ErrStreamsExhausted - } - if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { - goto GET_ID - } - - // Register the stream - stream := newStream(s, id, streamInit) - s.streamLock.Lock() - s.streams[id] = stream - s.inflight[id] = struct{}{} - s.streamLock.Unlock() - - // Send the window update to create - if err := stream.sendWindowUpdate(); err != nil { - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") - } - return nil, err - } - return stream, nil -} - -// Accept is used to block until the next available stream -// is ready to be accepted. -func (s *Session) Accept() (net.Conn, error) { - conn, err := s.AcceptStream() - if err != nil { - return nil, err - } - return conn, err -} - -// AcceptStream is used to block until the next available stream -// is ready to be accepted. -func (s *Session) AcceptStream() (*Stream, error) { - select { - case stream := <-s.acceptCh: - if err := stream.sendWindowUpdate(); err != nil { - return nil, err - } - return stream, nil - case <-s.shutdownCh: - return nil, s.shutdownErr - } -} - -// Close is used to close the session and all streams. -// Attempts to send a GoAway before closing the connection. -func (s *Session) Close() error { - s.shutdownLock.Lock() - defer s.shutdownLock.Unlock() - - if s.shutdown { - return nil - } - s.shutdown = true - if s.shutdownErr == nil { - s.shutdownErr = ErrSessionShutdown - } - close(s.shutdownCh) - s.conn.Close() - <-s.recvDoneCh - - s.streamLock.Lock() - defer s.streamLock.Unlock() - for _, stream := range s.streams { - stream.forceClose() - } - return nil -} - -// exitErr is used to handle an error that is causing the -// session to terminate. -func (s *Session) exitErr(err error) { - s.shutdownLock.Lock() - if s.shutdownErr == nil { - s.shutdownErr = err - } - s.shutdownLock.Unlock() - s.Close() -} - -// GoAway can be used to prevent accepting further -// connections. It does not close the underlying conn. -func (s *Session) GoAway() error { - return s.waitForSend(s.goAway(goAwayNormal), nil) -} - -// goAway is used to send a goAway message -func (s *Session) goAway(reason uint32) header { - atomic.SwapInt32(&s.localGoAway, 1) - hdr := header(make([]byte, headerSize)) - hdr.encode(typeGoAway, 0, 0, reason) - return hdr -} - -// Ping is used to measure the RTT response time -func (s *Session) Ping() (time.Duration, error) { - // Get a channel for the ping - ch := make(chan struct{}) - - // Get a new ping id, mark as pending - s.pingLock.Lock() - id := s.pingID - s.pingID++ - s.pings[id] = ch - s.pingLock.Unlock() - - // Send the ping request - hdr := header(make([]byte, headerSize)) - hdr.encode(typePing, flagSYN, 0, id) - if err := s.waitForSend(hdr, nil); err != nil { - return 0, err - } - - // Wait for a response - start := time.Now() - select { - case <-ch: - case <-time.After(s.config.ConnectionWriteTimeout): - s.pingLock.Lock() - delete(s.pings, id) // Ignore it if a response comes later. - s.pingLock.Unlock() - return 0, ErrTimeout - case <-s.shutdownCh: - return 0, ErrSessionShutdown - } - - // Compute the RTT - return time.Now().Sub(start), nil -} - -// keepalive is a long running goroutine that periodically does -// a ping to keep the connection alive. -func (s *Session) keepalive() { - for { - select { - case <-time.After(s.config.KeepAliveInterval): - _, err := s.Ping() - if err != nil { - s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) - s.exitErr(ErrKeepAliveTimeout) - return - } - case <-s.shutdownCh: - return - } - } -} - -// waitForSendErr waits to send a header, checking for a potential shutdown -func (s *Session) waitForSend(hdr header, body io.Reader) error { - errCh := make(chan error, 1) - return s.waitForSendErr(hdr, body, errCh) -} - -// waitForSendErr waits to send a header with optional data, checking for a -// potential shutdown. Since there's the expectation that sends can happen -// in a timely manner, we enforce the connection write timeout here. -func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() - - ready := sendReady{Hdr: hdr, Body: body, Err: errCh} - select { - case s.sendCh <- ready: - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } - - select { - case err := <-errCh: - return err - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } -} - -// sendNoWait does a send without waiting. Since there's the expectation that -// the send happens right here, we enforce the connection write timeout if we -// can't queue the header to be sent. -func (s *Session) sendNoWait(hdr header) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() - - select { - case s.sendCh <- sendReady{Hdr: hdr}: - return nil - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } -} - -// send is a long running goroutine that sends data -func (s *Session) send() { - for { - select { - case ready := <-s.sendCh: - // Send a header if ready - if ready.Hdr != nil { - sent := 0 - for sent < len(ready.Hdr) { - n, err := s.conn.Write(ready.Hdr[sent:]) - if err != nil { - s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } - sent += n - } - } - - // Send data from a body if given - if ready.Body != nil { - _, err := io.Copy(s.conn, ready.Body) - if err != nil { - s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } - } - - // No error, successful send - asyncSendErr(ready.Err, nil) - case <-s.shutdownCh: - return - } - } -} - -// recv is a long running goroutine that accepts new data -func (s *Session) recv() { - if err := s.recvLoop(); err != nil { - s.exitErr(err) - } -} - -// recvLoop continues to receive data until a fatal error is encountered -func (s *Session) recvLoop() error { - defer close(s.recvDoneCh) - hdr := header(make([]byte, headerSize)) - var handler func(header) error - for { - // Read the header - if _, err := io.ReadFull(s.bufRead, hdr); err != nil { - if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { - s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) - } - return err - } - - // Verify the version - if hdr.Version() != protoVersion { - s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) - return ErrInvalidVersion - } - - // Switch on the type - switch hdr.MsgType() { - case typeData: - handler = s.handleStreamMessage - case typeWindowUpdate: - handler = s.handleStreamMessage - case typeGoAway: - handler = s.handleGoAway - case typePing: - handler = s.handlePing - default: - return ErrInvalidMsgType - } - - // Invoke the handler - if err := handler(hdr); err != nil { - return err - } - } -} - -// handleStreamMessage handles either a data or window update frame -func (s *Session) handleStreamMessage(hdr header) error { - // Check for a new stream creation - id := hdr.StreamID() - flags := hdr.Flags() - if flags&flagSYN == flagSYN { - if err := s.incomingStream(id); err != nil { - return err - } - } - - // Get the stream - s.streamLock.Lock() - stream := s.streams[id] - s.streamLock.Unlock() - - // If we do not have a stream, likely we sent a RST - if stream == nil { - // Drain any data on the wire - if hdr.MsgType() == typeData && hdr.Length() > 0 { - s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) - if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { - s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) - return nil - } - } else { - s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) - } - return nil - } - - // Check if this is a window update - if hdr.MsgType() == typeWindowUpdate { - if err := stream.incrSendWindow(hdr, flags); err != nil { - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return err - } - return nil - } - - // Read the new data - if err := stream.readData(hdr, flags, s.bufRead); err != nil { - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return err - } - return nil -} - -// handlePing is invokde for a typePing frame -func (s *Session) handlePing(hdr header) error { - flags := hdr.Flags() - pingID := hdr.Length() - - // Check if this is a query, respond back in a separate context so we - // don't interfere with the receiving thread blocking for the write. - if flags&flagSYN == flagSYN { - go func() { - hdr := header(make([]byte, headerSize)) - hdr.encode(typePing, flagACK, 0, pingID) - if err := s.sendNoWait(hdr); err != nil { - s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) - } - }() - return nil - } - - // Handle a response - s.pingLock.Lock() - ch := s.pings[pingID] - if ch != nil { - delete(s.pings, pingID) - close(ch) - } - s.pingLock.Unlock() - return nil -} - -// handleGoAway is invokde for a typeGoAway frame -func (s *Session) handleGoAway(hdr header) error { - code := hdr.Length() - switch code { - case goAwayNormal: - atomic.SwapInt32(&s.remoteGoAway, 1) - case goAwayProtoErr: - s.logger.Printf("[ERR] yamux: received protocol error go away") - return fmt.Errorf("yamux protocol error") - case goAwayInternalErr: - s.logger.Printf("[ERR] yamux: received internal error go away") - return fmt.Errorf("remote yamux internal error") - default: - s.logger.Printf("[ERR] yamux: received unexpected go away") - return fmt.Errorf("unexpected go away received") - } - return nil -} - -// incomingStream is used to create a new incoming stream -func (s *Session) incomingStream(id uint32) error { - // Reject immediately if we are doing a go away - if atomic.LoadInt32(&s.localGoAway) == 1 { - hdr := header(make([]byte, headerSize)) - hdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(hdr) - } - - // Allocate a new stream - stream := newStream(s, id, streamSYNReceived) - - s.streamLock.Lock() - defer s.streamLock.Unlock() - - // Check if stream already exists - if _, ok := s.streams[id]; ok { - s.logger.Printf("[ERR] yamux: duplicate stream declared") - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return ErrDuplicateStream - } - - // Register the stream - s.streams[id] = stream - - // Check if we've exceeded the backlog - select { - case s.acceptCh <- stream: - return nil - default: - // Backlog exceeded! RST the stream - s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") - delete(s.streams, id) - stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(stream.sendHdr) - } -} - -// closeStream is used to close a stream once both sides have -// issued a close. If there was an in-flight SYN and the stream -// was not yet established, then this will give the credit back. -func (s *Session) closeStream(id uint32) { - s.streamLock.Lock() - if _, ok := s.inflight[id]; ok { - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: SYN tracking out of sync") - } - } - delete(s.streams, id) - s.streamLock.Unlock() -} - -// establishStream is used to mark a stream that was in the -// SYN Sent state as established. -func (s *Session) establishStream(id uint32) { - s.streamLock.Lock() - if _, ok := s.inflight[id]; ok { - delete(s.inflight, id) - } else { - s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") - } - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") - } - s.streamLock.Unlock() -} diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go deleted file mode 100644 index d216e281c..000000000 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ /dev/null @@ -1,457 +0,0 @@ -package yamux - -import ( - "bytes" - "io" - "sync" - "sync/atomic" - "time" -) - -type streamState int - -const ( - streamInit streamState = iota - streamSYNSent - streamSYNReceived - streamEstablished - streamLocalClose - streamRemoteClose - streamClosed - streamReset -) - -// Stream is used to represent a logical stream -// within a session. -type Stream struct { - recvWindow uint32 - sendWindow uint32 - - id uint32 - session *Session - - state streamState - stateLock sync.Mutex - - recvBuf *bytes.Buffer - recvLock sync.Mutex - - controlHdr header - controlErr chan error - controlHdrLock sync.Mutex - - sendHdr header - sendErr chan error - sendLock sync.Mutex - - recvNotifyCh chan struct{} - sendNotifyCh chan struct{} - - readDeadline time.Time - writeDeadline time.Time -} - -// newStream is used to construct a new stream within -// a given session for an ID -func newStream(session *Session, id uint32, state streamState) *Stream { - s := &Stream{ - id: id, - session: session, - state: state, - controlHdr: header(make([]byte, headerSize)), - controlErr: make(chan error, 1), - sendHdr: header(make([]byte, headerSize)), - sendErr: make(chan error, 1), - recvWindow: initialStreamWindow, - sendWindow: initialStreamWindow, - recvNotifyCh: make(chan struct{}, 1), - sendNotifyCh: make(chan struct{}, 1), - } - return s -} - -// Session returns the associated stream session -func (s *Stream) Session() *Session { - return s.session -} - -// StreamID returns the ID of this stream -func (s *Stream) StreamID() uint32 { - return s.id -} - -// Read is used to read from the stream -func (s *Stream) Read(b []byte) (n int, err error) { - defer asyncNotify(s.recvNotifyCh) -START: - s.stateLock.Lock() - switch s.state { - case streamLocalClose: - fallthrough - case streamRemoteClose: - fallthrough - case streamClosed: - s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { - s.recvLock.Unlock() - s.stateLock.Unlock() - return 0, io.EOF - } - s.recvLock.Unlock() - case streamReset: - s.stateLock.Unlock() - return 0, ErrConnectionReset - } - s.stateLock.Unlock() - - // If there is no data available, block - s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { - s.recvLock.Unlock() - goto WAIT - } - - // Read any bytes - n, _ = s.recvBuf.Read(b) - s.recvLock.Unlock() - - // Send a window update potentially - err = s.sendWindowUpdate() - return n, err - -WAIT: - var timeout <-chan time.Time - var timer *time.Timer - if !s.readDeadline.IsZero() { - delay := s.readDeadline.Sub(time.Now()) - timer = time.NewTimer(delay) - timeout = timer.C - } - select { - case <-s.recvNotifyCh: - if timer != nil { - timer.Stop() - } - goto START - case <-timeout: - return 0, ErrTimeout - } -} - -// Write is used to write to the stream -func (s *Stream) Write(b []byte) (n int, err error) { - s.sendLock.Lock() - defer s.sendLock.Unlock() - total := 0 - for total < len(b) { - n, err := s.write(b[total:]) - total += n - if err != nil { - return total, err - } - } - return total, nil -} - -// write is used to write to the stream, may return on -// a short write. -func (s *Stream) write(b []byte) (n int, err error) { - var flags uint16 - var max uint32 - var body io.Reader -START: - s.stateLock.Lock() - switch s.state { - case streamLocalClose: - fallthrough - case streamClosed: - s.stateLock.Unlock() - return 0, ErrStreamClosed - case streamReset: - s.stateLock.Unlock() - return 0, ErrConnectionReset - } - s.stateLock.Unlock() - - // If there is no data available, block - window := atomic.LoadUint32(&s.sendWindow) - if window == 0 { - goto WAIT - } - - // Determine the flags if any - flags = s.sendFlags() - - // Send up to our send window - max = min(window, uint32(len(b))) - body = bytes.NewReader(b[:max]) - - // Send the header - s.sendHdr.encode(typeData, flags, s.id, max) - if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { - return 0, err - } - - // Reduce our send window - atomic.AddUint32(&s.sendWindow, ^uint32(max-1)) - - // Unlock - return int(max), err - -WAIT: - var timeout <-chan time.Time - if !s.writeDeadline.IsZero() { - delay := s.writeDeadline.Sub(time.Now()) - timeout = time.After(delay) - } - select { - case <-s.sendNotifyCh: - goto START - case <-timeout: - return 0, ErrTimeout - } - return 0, nil -} - -// sendFlags determines any flags that are appropriate -// based on the current stream state -func (s *Stream) sendFlags() uint16 { - s.stateLock.Lock() - defer s.stateLock.Unlock() - var flags uint16 - switch s.state { - case streamInit: - flags |= flagSYN - s.state = streamSYNSent - case streamSYNReceived: - flags |= flagACK - s.state = streamEstablished - } - return flags -} - -// sendWindowUpdate potentially sends a window update enabling -// further writes to take place. Must be invoked with the lock. -func (s *Stream) sendWindowUpdate() error { - s.controlHdrLock.Lock() - defer s.controlHdrLock.Unlock() - - // Determine the delta update - max := s.session.config.MaxStreamWindowSize - delta := max - atomic.LoadUint32(&s.recvWindow) - - // Determine the flags if any - flags := s.sendFlags() - - // Check if we can omit the update - if delta < (max/2) && flags == 0 { - return nil - } - - // Update our window - atomic.AddUint32(&s.recvWindow, delta) - - // Send the header - s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) - if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { - return err - } - return nil -} - -// sendClose is used to send a FIN -func (s *Stream) sendClose() error { - s.controlHdrLock.Lock() - defer s.controlHdrLock.Unlock() - - flags := s.sendFlags() - flags |= flagFIN - s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0) - if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { - return err - } - return nil -} - -// Close is used to close the stream -func (s *Stream) Close() error { - closeStream := false - s.stateLock.Lock() - switch s.state { - // Opened means we need to signal a close - case streamSYNSent: - fallthrough - case streamSYNReceived: - fallthrough - case streamEstablished: - s.state = streamLocalClose - goto SEND_CLOSE - - case streamLocalClose: - case streamRemoteClose: - s.state = streamClosed - closeStream = true - goto SEND_CLOSE - - case streamClosed: - case streamReset: - default: - panic("unhandled state") - } - s.stateLock.Unlock() - return nil -SEND_CLOSE: - s.stateLock.Unlock() - s.sendClose() - s.notifyWaiting() - if closeStream { - s.session.closeStream(s.id) - } - return nil -} - -// forceClose is used for when the session is exiting -func (s *Stream) forceClose() { - s.stateLock.Lock() - s.state = streamClosed - s.stateLock.Unlock() - s.notifyWaiting() -} - -// processFlags is used to update the state of the stream -// based on set flags, if any. Lock must be held -func (s *Stream) processFlags(flags uint16) error { - // Close the stream without holding the state lock - closeStream := false - defer func() { - if closeStream { - s.session.closeStream(s.id) - } - }() - - s.stateLock.Lock() - defer s.stateLock.Unlock() - if flags&flagACK == flagACK { - if s.state == streamSYNSent { - s.state = streamEstablished - } - s.session.establishStream(s.id) - } - if flags&flagFIN == flagFIN { - switch s.state { - case streamSYNSent: - fallthrough - case streamSYNReceived: - fallthrough - case streamEstablished: - s.state = streamRemoteClose - s.notifyWaiting() - case streamLocalClose: - s.state = streamClosed - closeStream = true - s.notifyWaiting() - default: - s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state) - return ErrUnexpectedFlag - } - } - if flags&flagRST == flagRST { - s.state = streamReset - closeStream = true - s.notifyWaiting() - } - return nil -} - -// notifyWaiting notifies all the waiting channels -func (s *Stream) notifyWaiting() { - asyncNotify(s.recvNotifyCh) - asyncNotify(s.sendNotifyCh) -} - -// incrSendWindow updates the size of our send window -func (s *Stream) incrSendWindow(hdr header, flags uint16) error { - if err := s.processFlags(flags); err != nil { - return err - } - - // Increase window, unblock a sender - atomic.AddUint32(&s.sendWindow, hdr.Length()) - asyncNotify(s.sendNotifyCh) - return nil -} - -// readData is used to handle a data frame -func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { - if err := s.processFlags(flags); err != nil { - return err - } - - // Check that our recv window is not exceeded - length := hdr.Length() - if length == 0 { - return nil - } - if remain := atomic.LoadUint32(&s.recvWindow); length > remain { - s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length) - return ErrRecvWindowExceeded - } - - // Wrap in a limited reader - conn = &io.LimitedReader{R: conn, N: int64(length)} - - // Copy into buffer - s.recvLock.Lock() - if s.recvBuf == nil { - // Allocate the receive buffer just-in-time to fit the full data frame. - // This way we can read in the whole packet without further allocations. - s.recvBuf = bytes.NewBuffer(make([]byte, 0, length)) - } - if _, err := io.Copy(s.recvBuf, conn); err != nil { - s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err) - s.recvLock.Unlock() - return err - } - - // Decrement the receive window - atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) - s.recvLock.Unlock() - - // Unblock any readers - asyncNotify(s.recvNotifyCh) - return nil -} - -// SetDeadline sets the read and write deadlines -func (s *Stream) SetDeadline(t time.Time) error { - if err := s.SetReadDeadline(t); err != nil { - return err - } - if err := s.SetWriteDeadline(t); err != nil { - return err - } - return nil -} - -// SetReadDeadline sets the deadline for future Read calls. -func (s *Stream) SetReadDeadline(t time.Time) error { - s.readDeadline = t - return nil -} - -// SetWriteDeadline sets the deadline for future Write calls -func (s *Stream) SetWriteDeadline(t time.Time) error { - s.writeDeadline = t - return nil -} - -// Shrink is used to compact the amount of buffers utilized -// This is useful when using Yamux in a connection pool to reduce -// the idle memory utilization. -func (s *Stream) Shrink() { - s.recvLock.Lock() - if s.recvBuf != nil && s.recvBuf.Len() == 0 { - s.recvBuf = nil - } - s.recvLock.Unlock() -} diff --git a/vendor/github.com/hashicorp/yamux/util.go b/vendor/github.com/hashicorp/yamux/util.go deleted file mode 100644 index 5fe45afcd..000000000 --- a/vendor/github.com/hashicorp/yamux/util.go +++ /dev/null @@ -1,28 +0,0 @@ -package yamux - -// asyncSendErr is used to try an async send of an error -func asyncSendErr(ch chan error, err error) { - if ch == nil { - return - } - select { - case ch <- err: - default: - } -} - -// asyncNotify is used to signal a waiting goroutine -func asyncNotify(ch chan struct{}) { - select { - case ch <- struct{}{}: - default: - } -} - -// min computes the minimum of two values -func min(a, b uint32) uint32 { - if a < b { - return a - } - return b -} diff --git a/vendor/github.com/xtaci/kcp-go/emitter.go b/vendor/github.com/xtaci/kcp-go/emitter.go index 8c13118d8..778151ae4 100644 --- a/vendor/github.com/xtaci/kcp-go/emitter.go +++ b/vendor/github.com/xtaci/kcp-go/emitter.go @@ -7,17 +7,17 @@ import ( var defaultEmitter Emitter -const emitQueue = 8192 - func init() { defaultEmitter.init() } type ( + // packet emit request emitPacket struct { - conn net.PacketConn - to net.Addr - data []byte + conn net.PacketConn + to net.Addr + data []byte + // mark this packet should recycle to global xmitBuf recycle bool } @@ -28,7 +28,7 @@ type ( ) func (e *Emitter) init() { - e.ch = make(chan emitPacket, emitQueue) + e.ch = make(chan emitPacket) go e.emitTask() } @@ -36,7 +36,7 @@ func (e *Emitter) init() { func (e *Emitter) emitTask() { for p := range e.ch { if n, err := p.conn.WriteTo(p.data, p.to); err == nil { - atomic.AddUint64(&DefaultSnmp.OutSegs, 1) + atomic.AddUint64(&DefaultSnmp.OutPkts, 1) atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(n)) } if p.recycle { diff --git a/vendor/github.com/xtaci/kcp-go/kcp.go b/vendor/github.com/xtaci/kcp-go/kcp.go index 4ad88aa24..86efc3ab4 100644 --- a/vendor/github.com/xtaci/kcp-go/kcp.go +++ b/vendor/github.com/xtaci/kcp-go/kcp.go @@ -117,6 +117,7 @@ func (seg *Segment) encode(ptr []byte) []byte { ptr = ikcp_encode32u(ptr, seg.sn) ptr = ikcp_encode32u(ptr, seg.una) ptr = ikcp_encode32u(ptr, uint32(len(seg.data))) + atomic.AddUint64(&DefaultSnmp.OutSegs, 1) return ptr } @@ -484,9 +485,10 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { } var maxack uint32 + var lastackts uint32 var flag int + var inSegs uint64 - current := currentMs() for { var ts, sn, length, una, conv uint32 var wnd uint16 @@ -525,10 +527,6 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { kcp.shrink_buf() if cmd == IKCP_CMD_ACK { - if _itimediff(current, ts) >= 0 { - kcp.update_ack(_itimediff(current, ts)) - } - kcp.parse_ack(sn) kcp.shrink_buf() if flag == 0 { @@ -537,6 +535,7 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { } else if _itimediff(sn, maxack) > 0 { maxack = sn } + lastackts = ts } else if cmd == IKCP_CMD_PUSH { if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 { kcp.ack_push(sn, ts) @@ -567,11 +566,17 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { return -3 } + inSegs++ data = data[length:] } + atomic.AddUint64(&DefaultSnmp.InSegs, inSegs) if flag != 0 && regular { kcp.parse_fastack(maxack) + current := currentMs() + if _itimediff(current, lastackts) >= 0 { + kcp.update_ack(_itimediff(current, lastackts)) + } } if _itimediff(kcp.snd_una, una) > 0 { @@ -660,9 +665,9 @@ func (kcp *KCP) flush(ackOnly bool) { return } - current := currentMs() // probe window size (if remote window size equals zero) if kcp.rmt_wnd == 0 { + current := currentMs() if kcp.probe_wait == 0 { kcp.probe_wait = IKCP_PROBE_INIT kcp.ts_probe = current + kcp.probe_wait @@ -742,6 +747,7 @@ func (kcp *KCP) flush(ackOnly bool) { // send new segments for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ { + current := currentMs() segment := &kcp.snd_buf[k] segment.xmit++ segment.rto = kcp.rx_rto @@ -765,6 +771,7 @@ func (kcp *KCP) flush(ackOnly bool) { // check for retransmissions for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ { + current := currentMs() segment := &kcp.snd_buf[k] needsend := false if _itimediff(current, segment.resendts) >= 0 { // RTO diff --git a/vendor/github.com/xtaci/kcp-go/sess.go b/vendor/github.com/xtaci/kcp-go/sess.go index f5df379de..7ddfa797f 100644 --- a/vendor/github.com/xtaci/kcp-go/sess.go +++ b/vendor/github.com/xtaci/kcp-go/sess.go @@ -23,13 +23,23 @@ func (errTimeout) Temporary() bool { return true } func (errTimeout) Error() string { return "i/o timeout" } const ( - defaultWndSize = 128 // default window size, in packet - nonceSize = 16 // magic number - crcSize = 4 // 4bytes packet checksum + // 16-bytes magic number for each packet + nonceSize = 16 + + // 4-bytes packet checksum + crcSize = 4 + + // overall crypto header size cryptHeaderSize = nonceSize + crcSize - mtuLimit = 2048 - rxQueueLimit = 8192 - rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory + + // maximum packet size + mtuLimit = 2048 + + // packet receiving channel limit + rxQueueLimit = 2048 + + // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory + rxFECMulti = 3 ) const ( @@ -38,8 +48,12 @@ const ( ) var ( + // global packet buffer + // shared among sending/receiving/FEC xmitBuf sync.Pool - sid uint32 + + // monotonic session id + sid uint32 ) func init() { @@ -51,36 +65,39 @@ func init() { type ( // UDPSession defines a KCP session implemented by UDP UDPSession struct { - // core - sid uint32 - conn net.PacketConn // the underlying packet socket - kcp *KCP // the core ARQ - l *Listener // point to server listener if it's a server socket - block BlockCrypt // encryption - sockbuff []byte // kcp receiving is based on packet, I turn it into stream + sid uint32 // session id(monotonic) + conn net.PacketConn // the underlying packet connection + kcp *KCP // KCP ARQ protocol + l *Listener // point to the Listener if it's accepted by Listener + block BlockCrypt // block encryption - // forward error correction - fec *FEC - fecDataShards [][]byte - fecHeaderOffset int - fecPayloadOffset int - fecCnt int // count datashard - fecMaxSize int // record maximum data length in datashard + // kcp receiving is based on packets + // sockbuff turns packets into stream + sockbuff []byte + + fec *FEC // forward error correction + fecDataShards [][]byte // data shards cache + fecShardCount int // count the number of datashards collected + fecMaxSize int // record maximum data length in datashard + + fecHeaderOffset int // FEC header offset in packet + fecPayloadOffset int // FEC payload offset in packet // settings - remote net.Addr + remote net.Addr // remote peer address rd time.Time // read deadline wd time.Time // write deadline - headerSize int - updateInterval int32 - ackNoDelay bool + headerSize int // the overall header size added before KCP frame + updateInterval int32 // interval in seconds to call kcp.flush() + ackNoDelay bool // send ack immediately for each incoming packet // notifications - die chan struct{} - chReadEvent chan struct{} - chWriteEvent chan struct{} - isClosed bool - mu sync.Mutex + die chan struct{} // notify session has Closed + chReadEvent chan struct{} // notify Read() can be called without blocking + chWriteEvent chan struct{} // notify Write() can be called without blocking + + isClosed bool // flag the session has Closed + mu sync.Mutex } setReadBuffer interface { @@ -132,7 +149,6 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn sess.output(buf[:size]) } }) - sess.kcp.WndSize(defaultWndSize, defaultWndSize) sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize) sess.kcp.setFEC(dataShards, parityShards) @@ -324,11 +340,16 @@ func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) { s.kcp.WndSize(sndwnd, rcvwnd) } -// SetMtu sets the maximum transmission unit -func (s *UDPSession) SetMtu(mtu int) { +// SetMtu sets the maximum transmission unit(not including UDP header) +func (s *UDPSession) SetMtu(mtu int) bool { + if mtu > mtuLimit { + return false + } + s.mu.Lock() defer s.mu.Unlock() s.kcp.SetMtu(mtu - s.headerSize) + return true } // SetStreamMode toggles the stream mode on/off @@ -416,9 +437,9 @@ func (s *UDPSession) output(buf []byte) { // copy data to fec datashards sz := len(ext) - s.fecDataShards[s.fecCnt] = s.fecDataShards[s.fecCnt][:sz] - copy(s.fecDataShards[s.fecCnt], ext) - s.fecCnt++ + s.fecDataShards[s.fecShardCount] = s.fecDataShards[s.fecShardCount][:sz] + copy(s.fecDataShards[s.fecShardCount], ext) + s.fecShardCount++ // record max datashard length if sz > s.fecMaxSize { @@ -426,7 +447,7 @@ func (s *UDPSession) output(buf []byte) { } // calculate Reed-Solomon Erasure Code - if s.fecCnt == s.fec.dataShards { + if s.fecShardCount == s.fec.dataShards { // bzero each datashard's tail for i := 0; i < s.fec.dataShards; i++ { shard := s.fecDataShards[i] @@ -442,7 +463,7 @@ func (s *UDPSession) output(buf []byte) { } // reset counters to zero - s.fecCnt = 0 + s.fecShardCount = 0 s.fecMaxSize = 0 } } @@ -557,7 +578,7 @@ func (s *UDPSession) kcpInput(data []byte) { s.mu.Unlock() } - atomic.AddUint64(&DefaultSnmp.InSegs, 1) + atomic.AddUint64(&DefaultSnmp.InPkts, 1) atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data))) if fecParityShards > 0 { atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards) @@ -626,21 +647,23 @@ func (s *UDPSession) readLoop() { type ( // Listener defines a server listening for connections Listener struct { - block BlockCrypt - dataShards, parityShards int - fec *FEC // for fec init test - conn net.PacketConn - sessions map[string]*UDPSession - chAccepts chan *UDPSession - chDeadlinks chan net.Addr - headerSize int - die chan struct{} - rxbuf sync.Pool - rd atomic.Value - wd atomic.Value + block BlockCrypt // block encryption + dataShards int // FEC data shard + parityShards int // FEC parity shard + fec *FEC // FEC mock initialization + conn net.PacketConn // the underlying packet connection + + sessions map[string]*UDPSession // all sessions accepted by this Listener + chAccepts chan *UDPSession // Listen() backlog + chDeadlinks chan net.Addr // session close queue + headerSize int // the overall header size added before KCP frame + die chan struct{} // notify the listener has closed + rd atomic.Value // read deadline for Accept() + wd atomic.Value } - packet struct { + // incoming packet + inPacket struct { from net.Addr data []byte } @@ -648,7 +671,7 @@ type ( // monitor incoming data for all connections of server func (l *Listener) monitor() { - chPacket := make(chan packet, rxQueueLimit) + chPacket := make(chan inPacket, rxQueueLimit) go l.receiver(chPacket) for { select { @@ -699,7 +722,7 @@ func (l *Listener) monitor() { } } - l.rxbuf.Put(raw) + xmitBuf.Put(raw) case deadlink := <-l.chDeadlinks: delete(l.sessions, deadlink.String()) case <-l.die: @@ -708,11 +731,11 @@ func (l *Listener) monitor() { } } -func (l *Listener) receiver(ch chan packet) { +func (l *Listener) receiver(ch chan inPacket) { for { - data := l.rxbuf.Get().([]byte)[:mtuLimit] + data := xmitBuf.Get().([]byte)[:mtuLimit] if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD { - ch <- packet{from, data[:n]} + ch <- inPacket{from, data[:n]} } else if err != nil { return } else { @@ -829,9 +852,6 @@ func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo l.parityShards = parityShards l.block = block l.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards) - l.rxbuf.New = func() interface{} { - return make([]byte, mtuLimit) - } // calculate header size if l.block != nil { diff --git a/vendor/github.com/xtaci/kcp-go/snmp.go b/vendor/github.com/xtaci/kcp-go/snmp.go index baf7e32bc..607118e3a 100644 --- a/vendor/github.com/xtaci/kcp-go/snmp.go +++ b/vendor/github.com/xtaci/kcp-go/snmp.go @@ -7,22 +7,24 @@ import ( // Snmp defines network statistics indicator type Snmp struct { - BytesSent uint64 // raw bytes sent - BytesReceived uint64 - MaxConn uint64 - ActiveOpens uint64 - PassiveOpens uint64 - CurrEstab uint64 // count of connections for now - InErrs uint64 // udp read errors + BytesSent uint64 // bytes sent from upper level + BytesReceived uint64 // bytes received to upper level + MaxConn uint64 // max number of connections ever reached + ActiveOpens uint64 // accumulated active open connections + PassiveOpens uint64 // accumulated passive open connections + CurrEstab uint64 // current number of established connections + InErrs uint64 // UDP read errors reported from net.PacketConn InCsumErrors uint64 // checksum errors from CRC32 - KCPInErrors uint64 // packet iput errors from kcp - InSegs uint64 - OutSegs uint64 - InBytes uint64 // udp bytes received - OutBytes uint64 // udp bytes sent - RetransSegs uint64 - FastRetransSegs uint64 - EarlyRetransSegs uint64 + KCPInErrors uint64 // packet iput errors reported from KCP + InPkts uint64 // incoming packets count + OutPkts uint64 // outgoing packets count + InSegs uint64 // incoming KCP segments + OutSegs uint64 // outgoing KCP segments + InBytes uint64 // UDP bytes received + OutBytes uint64 // UDP bytes sent + RetransSegs uint64 // accmulated retransmited segments + FastRetransSegs uint64 // accmulated fast retransmitted segments + EarlyRetransSegs uint64 // accmulated early retransmitted segments LostSegs uint64 // number of segs infered as lost RepeatSegs uint64 // number of segs duplicated FECRecovered uint64 // correct packets recovered from FEC @@ -47,6 +49,8 @@ func (s *Snmp) Header() []string { "InErrs", "InCsumErrors", "KCPInErrors", + "InPkts", + "OutPkts", "InSegs", "OutSegs", "InBytes", @@ -76,6 +80,8 @@ func (s *Snmp) ToSlice() []string { fmt.Sprint(snmp.InErrs), fmt.Sprint(snmp.InCsumErrors), fmt.Sprint(snmp.KCPInErrors), + fmt.Sprint(snmp.InPkts), + fmt.Sprint(snmp.OutPkts), fmt.Sprint(snmp.InSegs), fmt.Sprint(snmp.OutSegs), fmt.Sprint(snmp.InBytes), @@ -104,6 +110,8 @@ func (s *Snmp) Copy() *Snmp { d.InErrs = atomic.LoadUint64(&s.InErrs) d.InCsumErrors = atomic.LoadUint64(&s.InCsumErrors) d.KCPInErrors = atomic.LoadUint64(&s.KCPInErrors) + d.InPkts = atomic.LoadUint64(&s.InPkts) + d.OutPkts = atomic.LoadUint64(&s.OutPkts) d.InSegs = atomic.LoadUint64(&s.InSegs) d.OutSegs = atomic.LoadUint64(&s.OutSegs) d.InBytes = atomic.LoadUint64(&s.InBytes) @@ -131,6 +139,8 @@ func (s *Snmp) Reset() { atomic.StoreUint64(&s.InErrs, 0) atomic.StoreUint64(&s.InCsumErrors, 0) atomic.StoreUint64(&s.KCPInErrors, 0) + atomic.StoreUint64(&s.InPkts, 0) + atomic.StoreUint64(&s.OutPkts, 0) atomic.StoreUint64(&s.InSegs, 0) atomic.StoreUint64(&s.OutSegs, 0) atomic.StoreUint64(&s.InBytes, 0) diff --git a/vendor/github.com/xtaci/kcp-go/updater.go b/vendor/github.com/xtaci/kcp-go/updater.go index 2df6b9c2c..630d4e0ea 100644 --- a/vendor/github.com/xtaci/kcp-go/updater.go +++ b/vendor/github.com/xtaci/kcp-go/updater.go @@ -13,12 +13,14 @@ func init() { go updater.updateTask() } +// entry contains a session update info type entry struct { sid uint32 ts time.Time s *UDPSession } +// a global heap managed kcp.flush() caller type updateHeap struct { entries []entry indices map[uint32]int diff --git a/vendor/github.com/xtaci/smux/LICENSE b/vendor/github.com/xtaci/smux/LICENSE new file mode 100644 index 000000000..eed41acb1 --- /dev/null +++ b/vendor/github.com/xtaci/smux/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016-2017 Daniel Fu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/xtaci/smux/frame.go b/vendor/github.com/xtaci/smux/frame.go new file mode 100644 index 000000000..36062d7be --- /dev/null +++ b/vendor/github.com/xtaci/smux/frame.go @@ -0,0 +1,60 @@ +package smux + +import ( + "encoding/binary" + "fmt" +) + +const ( + version = 1 +) + +const ( // cmds + cmdSYN byte = iota // stream open + cmdFIN // stream close, a.k.a EOF mark + cmdPSH // data push + cmdNOP // no operation +) + +const ( + sizeOfVer = 1 + sizeOfCmd = 1 + sizeOfLength = 2 + sizeOfSid = 4 + headerSize = sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength +) + +// Frame defines a packet from or to be multiplexed into a single connection +type Frame struct { + ver byte + cmd byte + sid uint32 + data []byte +} + +func newFrame(cmd byte, sid uint32) Frame { + return Frame{ver: version, cmd: cmd, sid: sid} +} + +type rawHeader []byte + +func (h rawHeader) Version() byte { + return h[0] +} + +func (h rawHeader) Cmd() byte { + return h[1] +} + +func (h rawHeader) Length() uint16 { + return binary.LittleEndian.Uint16(h[2:]) +} + +func (h rawHeader) StreamID() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} + +func (h rawHeader) String() string { + return fmt.Sprintf("Version:%d Cmd:%d StreamID:%d Length:%d", + h.Version(), h.Cmd(), h.StreamID(), h.Length()) +} diff --git a/vendor/github.com/xtaci/smux/mux.go b/vendor/github.com/xtaci/smux/mux.go new file mode 100644 index 000000000..afcf58b49 --- /dev/null +++ b/vendor/github.com/xtaci/smux/mux.go @@ -0,0 +1,80 @@ +package smux + +import ( + "fmt" + "io" + "time" + + "github.com/pkg/errors" +) + +// Config is used to tune the Smux session +type Config struct { + // KeepAliveInterval is how often to send a NOP command to the remote + KeepAliveInterval time.Duration + + // KeepAliveTimeout is how long the session + // will be closed if no data has arrived + KeepAliveTimeout time.Duration + + // MaxFrameSize is used to control the maximum + // frame size to sent to the remote + MaxFrameSize int + + // MaxReceiveBuffer is used to control the maximum + // number of data in the buffer pool + MaxReceiveBuffer int +} + +// DefaultConfig is used to return a default configuration +func DefaultConfig() *Config { + return &Config{ + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 4096, + MaxReceiveBuffer: 4194304, + } +} + +// VerifyConfig is used to verify the sanity of configuration +func VerifyConfig(config *Config) error { + if config.KeepAliveInterval == 0 { + return errors.New("keep-alive interval must be positive") + } + if config.KeepAliveTimeout < config.KeepAliveInterval { + return fmt.Errorf("keep-alive timeout must be larger than keep-alive interval") + } + if config.MaxFrameSize <= 0 { + return errors.New("max frame size must be positive") + } + if config.MaxFrameSize > 65535 { + return errors.New("max frame size must not be larger than 65535") + } + if config.MaxReceiveBuffer <= 0 { + return errors.New("max receive buffer must be positive") + } + return nil +} + +// Server is used to initialize a new server-side connection. +func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, false), nil +} + +// Client is used to initialize a new client-side connection. +func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, true), nil +} diff --git a/vendor/github.com/xtaci/smux/session.go b/vendor/github.com/xtaci/smux/session.go new file mode 100644 index 000000000..1a2cf25e6 --- /dev/null +++ b/vendor/github.com/xtaci/smux/session.go @@ -0,0 +1,335 @@ +package smux + +import ( + "encoding/binary" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" +) + +const ( + defaultAcceptBacklog = 1024 +) + +const ( + errBrokenPipe = "broken pipe" + errInvalidProtocol = "invalid protocol version" +) + +type writeRequest struct { + frame Frame + result chan writeResult +} + +type writeResult struct { + n int + err error +} + +// Session defines a multiplexed connection for streams +type Session struct { + conn io.ReadWriteCloser + + config *Config + nextStreamID uint32 // next stream identifier + + bucket int32 // token bucket + bucketCond *sync.Cond // used for waiting for tokens + + streams map[uint32]*Stream // all streams in this session + streamLock sync.Mutex // locks streams + + die chan struct{} // flag session has died + dieLock sync.Mutex + chAccepts chan *Stream + + dataReady int32 // flag data has arrived + + deadline atomic.Value + + writes chan writeRequest +} + +func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { + s := new(Session) + s.die = make(chan struct{}) + s.conn = conn + s.config = config + s.streams = make(map[uint32]*Stream) + s.chAccepts = make(chan *Stream, defaultAcceptBacklog) + s.bucket = int32(config.MaxReceiveBuffer) + s.bucketCond = sync.NewCond(&sync.Mutex{}) + s.writes = make(chan writeRequest) + + if client { + s.nextStreamID = 1 + } else { + s.nextStreamID = 2 + } + go s.recvLoop() + go s.sendLoop() + go s.keepalive() + return s +} + +// OpenStream is used to create a new stream +func (s *Session) OpenStream() (*Stream, error) { + if s.IsClosed() { + return nil, errors.New(errBrokenPipe) + } + + sid := atomic.AddUint32(&s.nextStreamID, 2) + stream := newStream(sid, s.config.MaxFrameSize, s) + + if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil { + return nil, errors.Wrap(err, "writeFrame") + } + + s.streamLock.Lock() + s.streams[sid] = stream + s.streamLock.Unlock() + return stream, nil +} + +// AcceptStream is used to block until the next available stream +// is ready to be accepted. +func (s *Session) AcceptStream() (*Stream, error) { + var deadline <-chan time.Time + if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + select { + case stream := <-s.chAccepts: + return stream, nil + case <-deadline: + return nil, errTimeout + case <-s.die: + return nil, errors.New(errBrokenPipe) + } +} + +// Close is used to close the session and all streams. +func (s *Session) Close() (err error) { + s.dieLock.Lock() + + select { + case <-s.die: + s.dieLock.Unlock() + return errors.New(errBrokenPipe) + default: + close(s.die) + s.dieLock.Unlock() + s.streamLock.Lock() + for k := range s.streams { + s.streams[k].sessionClose() + } + s.streamLock.Unlock() + s.bucketCond.Signal() + return s.conn.Close() + } +} + +// IsClosed does a safe check to see if we have shutdown +func (s *Session) IsClosed() bool { + select { + case <-s.die: + return true + default: + return false + } +} + +// NumStreams returns the number of currently open streams +func (s *Session) NumStreams() int { + if s.IsClosed() { + return 0 + } + s.streamLock.Lock() + defer s.streamLock.Unlock() + return len(s.streams) +} + +// SetDeadline sets a deadline used by Accept* calls. +// A zero time value disables the deadline. +func (s *Session) SetDeadline(t time.Time) error { + s.deadline.Store(t) + return nil +} + +// notify the session that a stream has closed +func (s *Session) streamClosed(sid uint32) { + s.streamLock.Lock() + if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.bucketCond.Signal() + } + } + delete(s.streams, sid) + s.streamLock.Unlock() +} + +// returnTokens is called by stream to return token after read +func (s *Session) returnTokens(n int) { + oldvalue := atomic.LoadInt32(&s.bucket) + newvalue := atomic.AddInt32(&s.bucket, int32(n)) + if oldvalue <= 0 && newvalue > 0 { + s.bucketCond.Signal() + } + +} + +// session read a frame from underlying connection +// it's data is pointed to the input buffer +func (s *Session) readFrame(buffer []byte) (f Frame, err error) { + if _, err := io.ReadFull(s.conn, buffer[:headerSize]); err != nil { + return f, errors.Wrap(err, "readFrame") + } + + dec := rawHeader(buffer) + if dec.Version() != version { + return f, errors.New(errInvalidProtocol) + } + + f.ver = dec.Version() + f.cmd = dec.Cmd() + f.sid = dec.StreamID() + if length := dec.Length(); length > 0 { + if _, err := io.ReadFull(s.conn, buffer[headerSize:headerSize+length]); err != nil { + return f, errors.Wrap(err, "readFrame") + } + f.data = buffer[headerSize : headerSize+length] + } + return f, nil +} + +// recvLoop keeps on reading from underlying connection if tokens are available +func (s *Session) recvLoop() { + buffer := make([]byte, (1<<16)+headerSize) + for { + s.bucketCond.L.Lock() + for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() { + s.bucketCond.Wait() + } + s.bucketCond.L.Unlock() + + if s.IsClosed() { + return + } + + if f, err := s.readFrame(buffer); err == nil { + atomic.StoreInt32(&s.dataReady, 1) + + switch f.cmd { + case cmdNOP: + case cmdSYN: + s.streamLock.Lock() + if _, ok := s.streams[f.sid]; !ok { + stream := newStream(f.sid, s.config.MaxFrameSize, s) + s.streams[f.sid] = stream + select { + case s.chAccepts <- stream: + case <-s.die: + } + } + s.streamLock.Unlock() + case cmdFIN: + s.streamLock.Lock() + if stream, ok := s.streams[f.sid]; ok { + stream.markRST() + stream.notifyReadEvent() + } + s.streamLock.Unlock() + case cmdPSH: + s.streamLock.Lock() + if stream, ok := s.streams[f.sid]; ok { + atomic.AddInt32(&s.bucket, -int32(len(f.data))) + stream.pushBytes(f.data) + stream.notifyReadEvent() + } + s.streamLock.Unlock() + default: + s.Close() + return + } + } else { + s.Close() + return + } + } +} + +func (s *Session) keepalive() { + tickerPing := time.NewTicker(s.config.KeepAliveInterval) + tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout) + defer tickerPing.Stop() + defer tickerTimeout.Stop() + for { + select { + case <-tickerPing.C: + s.writeFrame(newFrame(cmdNOP, 0)) + s.bucketCond.Signal() // force a signal to the recvLoop + case <-tickerTimeout.C: + if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) { + s.Close() + return + } + case <-s.die: + return + } + } +} + +func (s *Session) sendLoop() { + buf := make([]byte, (1<<16)+headerSize) + for { + select { + case <-s.die: + return + case request, ok := <-s.writes: + if !ok { + continue + } + buf[0] = request.frame.ver + buf[1] = request.frame.cmd + binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data))) + binary.LittleEndian.PutUint32(buf[4:], request.frame.sid) + copy(buf[headerSize:], request.frame.data) + n, err := s.conn.Write(buf[:headerSize+len(request.frame.data)]) + + n -= headerSize + if n < 0 { + n = 0 + } + + result := writeResult{ + n: n, + err: err, + } + + request.result <- result + close(request.result) + } + } +} + +// writeFrame writes the frame to the underlying connection +// and returns the number of bytes written if successful +func (s *Session) writeFrame(f Frame) (n int, err error) { + req := writeRequest{ + frame: f, + result: make(chan writeResult, 1), + } + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + case s.writes <- req: + } + + result := <-req.result + return result.n, result.err +} diff --git a/vendor/github.com/xtaci/smux/stream.go b/vendor/github.com/xtaci/smux/stream.go new file mode 100644 index 000000000..cf61b451b --- /dev/null +++ b/vendor/github.com/xtaci/smux/stream.go @@ -0,0 +1,261 @@ +package smux + +import ( + "bytes" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" +) + +// Stream implements net.Conn +type Stream struct { + id uint32 + rstflag int32 + sess *Session + buffer bytes.Buffer + bufferLock sync.Mutex + frameSize int + chReadEvent chan struct{} // notify a read event + die chan struct{} // flag the stream has closed + dieLock sync.Mutex + readDeadline atomic.Value + writeDeadline atomic.Value +} + +// newStream initiates a Stream struct +func newStream(id uint32, frameSize int, sess *Session) *Stream { + s := new(Stream) + s.id = id + s.chReadEvent = make(chan struct{}, 1) + s.frameSize = frameSize + s.sess = sess + s.die = make(chan struct{}) + return s +} + +// ID returns the unique stream ID. +func (s *Stream) ID() uint32 { + return s.id +} + +// Read implements net.Conn +func (s *Stream) Read(b []byte) (n int, err error) { + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + +READ: + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + case <-deadline: + return n, errTimeout + default: + } + + s.bufferLock.Lock() + n, err = s.buffer.Read(b) + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + return n, nil + } else if atomic.LoadInt32(&s.rstflag) == 1 { + _ = s.Close() + return 0, io.EOF + } + + select { + case <-s.chReadEvent: + goto READ + case <-deadline: + return n, errTimeout + case <-s.die: + return 0, errors.New(errBrokenPipe) + } +} + +// Write implements net.Conn +func (s *Stream) Write(b []byte) (n int, err error) { + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + default: + } + + frames := s.split(b, cmdPSH, s.id) + sent := 0 + for k := range frames { + req := writeRequest{ + frame: frames[k], + result: make(chan writeResult, 1), + } + + select { + case s.sess.writes <- req: + case <-s.die: + return sent, errors.New(errBrokenPipe) + case <-deadline: + return sent, errTimeout + } + + select { + case result := <-req.result: + sent += result.n + if result.err != nil { + return sent, result.err + } + case <-s.die: + return sent, errors.New(errBrokenPipe) + case <-deadline: + return sent, errTimeout + } + } + return sent, nil +} + +// Close implements net.Conn +func (s *Stream) Close() error { + s.dieLock.Lock() + + select { + case <-s.die: + s.dieLock.Unlock() + return errors.New(errBrokenPipe) + default: + close(s.die) + s.dieLock.Unlock() + s.sess.streamClosed(s.id) + _, err := s.sess.writeFrame(newFrame(cmdFIN, s.id)) + return err + } +} + +// SetReadDeadline sets the read deadline as defined by +// net.Conn.SetReadDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetReadDeadline(t time.Time) error { + s.readDeadline.Store(t) + return nil +} + +// SetWriteDeadline sets the write deadline as defined by +// net.Conn.SetWriteDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetWriteDeadline(t time.Time) error { + s.writeDeadline.Store(t) + return nil +} + +// SetDeadline sets both read and write deadlines as defined by +// net.Conn.SetDeadline. +// A zero time value disables the deadlines. +func (s *Stream) SetDeadline(t time.Time) error { + if err := s.SetReadDeadline(t); err != nil { + return err + } + if err := s.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +// session closes the stream +func (s *Stream) sessionClose() { + s.dieLock.Lock() + defer s.dieLock.Unlock() + + select { + case <-s.die: + default: + close(s.die) + } +} + +// LocalAddr satisfies net.Conn interface +func (s *Stream) LocalAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Stream) RemoteAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// pushBytes a slice into buffer +func (s *Stream) pushBytes(p []byte) { + s.bufferLock.Lock() + s.buffer.Write(p) + s.bufferLock.Unlock() +} + +// recycleTokens transform remaining bytes to tokens(will truncate buffer) +func (s *Stream) recycleTokens() (n int) { + s.bufferLock.Lock() + n = s.buffer.Len() + s.buffer.Reset() + s.bufferLock.Unlock() + return +} + +// split large byte buffer into smaller frames, reference only +func (s *Stream) split(bts []byte, cmd byte, sid uint32) []Frame { + var frames []Frame + for len(bts) > s.frameSize { + frame := newFrame(cmd, sid) + frame.data = bts[:s.frameSize] + bts = bts[s.frameSize:] + frames = append(frames, frame) + } + if len(bts) > 0 { + frame := newFrame(cmd, sid) + frame.data = bts + frames = append(frames, frame) + } + return frames +} + +// notify read event +func (s *Stream) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +// mark this stream has been reset +func (s *Stream) markRST() { + atomic.StoreInt32(&s.rstflag, 1) +} + +var errTimeout error = &timeoutError{} + +type timeoutError struct{} + +func (e *timeoutError) Error() string { return "i/o timeout" } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return true } diff --git a/vendor/manifest b/vendor/manifest index 4805ea189..c61edb430 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -199,14 +199,6 @@ "revision": "5f1c01d9f64b941dd9582c638279d046eda6ca31", "branch": "master" }, - { - "importpath": "github.com/hashicorp/yamux", - "repository": "https://github.com/hashicorp/yamux", - "vcs": "git", - "revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd", - "branch": "master", - "notests": true - }, { "importpath": "github.com/jackpal/gateway", "repository": "https://github.com/jackpal/gateway", @@ -354,7 +346,15 @@ "importpath": "github.com/xtaci/kcp-go", "repository": "https://github.com/xtaci/kcp-go", "vcs": "git", - "revision": "0ca962cb10f29ee0735ff7dec69ec7283af47f65", + "revision": "f918e6d43cb5e8398d91e1767ec61bed7b7b4d49", + "branch": "master", + "notests": true + }, + { + "importpath": "github.com/xtaci/smux", + "repository": "https://github.com/xtaci/smux", + "vcs": "git", + "revision": "bfc89bc3f7f7791e35a10b24496cc7454a9b4a64", "branch": "master", "notests": true },