Merge pull request #2181 from calmh/refactor2

Small refactorings on relay
This commit is contained in:
Audrius Butkevicius 2015-08-23 13:02:06 +01:00
commit 37f7c48cfc
5 changed files with 36 additions and 45 deletions

View File

@ -143,7 +143,7 @@ next:
s.mut.RLock() s.mut.RLock()
ct, ok := s.connType[remoteID] ct, ok := s.connType[remoteID]
s.mut.RUnlock() s.mut.RUnlock()
if ok && !ct.IsDirect() && c.ConnType.IsDirect() { if ok && !ct.IsDirect() && c.Type.IsDirect() {
if debugNet { if debugNet {
l.Debugln("Switching connections", remoteID) l.Debugln("Switching connections", remoteID)
} }
@ -194,7 +194,7 @@ next:
rd = &limitedReader{c.Conn, readRateLimit} rd = &limitedReader{c.Conn, readRateLimit}
} }
name := fmt.Sprintf("%s-%s (%s)", c.Conn.LocalAddr(), c.Conn.RemoteAddr(), c.ConnType) name := fmt.Sprintf("%s-%s (%s)", c.Conn.LocalAddr(), c.Conn.RemoteAddr(), c.Type)
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression) protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression)
l.Infof("Established secure connection to %s at %s", remoteID, name) l.Infof("Established secure connection to %s at %s", remoteID, name)
@ -205,10 +205,10 @@ next:
s.model.AddConnection(model.Connection{ s.model.AddConnection(model.Connection{
c.Conn, c.Conn,
protoConn, protoConn,
c.ConnType, c.Type,
}) })
s.mut.Lock() s.mut.Lock()
s.connType[remoteID] = c.ConnType s.connType[remoteID] = c.Type
s.mut.Unlock() s.mut.Unlock()
continue next continue next
} }
@ -219,11 +219,9 @@ next:
"device": remoteID.String(), "device": remoteID.String(),
"address": c.Conn.RemoteAddr().String(), "address": c.Conn.RemoteAddr().String(),
}) })
l.Infof("Connection from %s (%s) with unknown device ID %s", c.Conn.RemoteAddr(), c.ConnType, remoteID)
} else {
l.Infof("Connection from %s (%s) with ignored device ID %s", c.Conn.RemoteAddr(), c.ConnType, remoteID)
} }
l.Infof("Connection from %s (%s) with ignored device ID %s", c.Conn.RemoteAddr(), c.Type, remoteID)
c.Conn.Close() c.Conn.Close()
} }
} }
@ -290,7 +288,7 @@ func (s *connectionSvc) connect() {
} }
s.conns <- model.IntermediateConnection{ s.conns <- model.IntermediateConnection{
conn, model.ConnectionTypeBasicDial, conn, model.ConnectionTypeDirectDial,
} }
continue nextDevice continue nextDevice
} }

View File

@ -99,7 +99,7 @@ func tcpListener(uri *url.URL, tlsCfg *tls.Config, conns chan<- model.Intermedia
} }
conns <- model.IntermediateConnection{ conns <- model.IntermediateConnection{
tc, model.ConnectionTypeBasicAccept, tc, model.ConnectionTypeDirectAccept,
} }
} }
} }

View File

@ -14,8 +14,8 @@ import (
) )
type IntermediateConnection struct { type IntermediateConnection struct {
Conn *tls.Conn *tls.Conn
ConnType ConnectionType Type ConnectionType
} }
type Connection struct { type Connection struct {
@ -25,8 +25,8 @@ type Connection struct {
} }
const ( const (
ConnectionTypeBasicAccept ConnectionType = iota ConnectionTypeDirectAccept ConnectionType = iota
ConnectionTypeBasicDial ConnectionTypeDirectDial
ConnectionTypeRelayAccept ConnectionTypeRelayAccept
ConnectionTypeRelayDial ConnectionTypeRelayDial
) )
@ -35,10 +35,10 @@ type ConnectionType int
func (t ConnectionType) String() string { func (t ConnectionType) String() string {
switch t { switch t {
case ConnectionTypeBasicAccept: case ConnectionTypeDirectAccept:
return "basic-accept" return "direct-accept"
case ConnectionTypeBasicDial: case ConnectionTypeDirectDial:
return "basic-dial" return "direct-dial"
case ConnectionTypeRelayAccept: case ConnectionTypeRelayAccept:
return "relay-accept" return "relay-accept"
case ConnectionTypeRelayDial: case ConnectionTypeRelayDial:
@ -48,5 +48,5 @@ func (t ConnectionType) String() string {
} }
func (t ConnectionType) IsDirect() bool { func (t ConnectionType) IsDirect() bool {
return t == ConnectionTypeBasicAccept || t == ConnectionTypeBasicDial return t == ConnectionTypeDirectAccept || t == ConnectionTypeDirectDial
} }

View File

@ -285,7 +285,7 @@ func BenchmarkRequest(b *testing.B) {
m.AddConnection(Connection{ m.AddConnection(Connection{
&net.TCPConn{}, &net.TCPConn{},
fc, fc,
ConnectionTypeBasicAccept, ConnectionTypeDirectAccept,
}) })
m.Index(device1, "default", files, 0, nil) m.Index(device1, "default", files, 0, nil)
@ -328,7 +328,7 @@ func TestDeviceRename(t *testing.T) {
m.AddConnection(Connection{ m.AddConnection(Connection{
&net.TCPConn{}, &net.TCPConn{},
fc, fc,
ConnectionTypeBasicAccept, ConnectionTypeDirectAccept,
}) })
m.ServeBackground() m.ServeBackground()

View File

@ -54,9 +54,10 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.Intermed
tlsCfg: tlsCfg, tlsCfg: tlsCfg,
conns: conns, conns: conns,
invitations: svc.invitations, invitations: svc.invitations,
stop: make(chan struct{}),
} }
svc.receiverToken = svc.Add(receiver) svc.Add(receiver)
return svc return svc
} }
@ -66,11 +67,10 @@ type Svc struct {
cfg *config.Wrapper cfg *config.Wrapper
tlsCfg *tls.Config tlsCfg *tls.Config
receiverToken suture.ServiceToken tokens map[string]suture.ServiceToken
tokens map[string]suture.ServiceToken clients map[string]*client.ProtocolClient
clients map[string]*client.ProtocolClient mut sync.RWMutex
mut sync.RWMutex invitations chan protocol.SessionInvitation
invitations chan protocol.SessionInvitation
} }
func (s *Svc) VerifyConfiguration(from, to config.Configuration) error { func (s *Svc) VerifyConfiguration(from, to config.Configuration) error {
@ -123,6 +123,7 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
} }
continue continue
} }
for _, relayAnn := range ann.Relays { for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL) ruri, err := url.Parse(relayAnn.URL)
if err != nil { if err != nil {
@ -138,6 +139,8 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
} }
} }
s.mut.Lock()
for key, uri := range existing { for key, uri := range existing {
_, ok := s.tokens[key] _, ok := s.tokens[key]
if !ok { if !ok {
@ -146,9 +149,7 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
} }
c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations) c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
s.tokens[key] = s.Add(c) s.tokens[key] = s.Add(c)
s.mut.Lock()
s.clients[key] = c s.clients[key] = c
s.mut.Unlock()
} }
} }
@ -157,15 +158,15 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
if !ok { if !ok {
err := s.Remove(token) err := s.Remove(token)
delete(s.tokens, key) delete(s.tokens, key)
s.mut.Lock()
delete(s.clients, key) delete(s.clients, key)
s.mut.Unlock()
if debug { if debug {
l.Debugln("Disconnecting from relay", key, err) l.Debugln("Disconnecting from relay", key, err)
} }
} }
} }
s.mut.Unlock()
return true return true
} }
@ -187,11 +188,6 @@ type invitationReceiver struct {
} }
func (r *invitationReceiver) Serve() { func (r *invitationReceiver) Serve() {
if r.stop != nil {
return
}
r.stop = make(chan struct{})
for { for {
select { select {
case inv := <-r.invitations: case inv := <-r.invitations:
@ -227,6 +223,7 @@ func (r *invitationReceiver) Serve() {
r.conns <- model.IntermediateConnection{ r.conns <- model.IntermediateConnection{
tc, model.ConnectionTypeRelayAccept, tc, model.ConnectionTypeRelayAccept,
} }
case <-r.stop: case <-r.stop:
return return
} }
@ -234,17 +231,13 @@ func (r *invitationReceiver) Serve() {
} }
func (r *invitationReceiver) Stop() { func (r *invitationReceiver) Stop() {
if r.stop == nil { close(r.stop)
return
}
r.stop <- struct{}{}
r.stop = nil
} }
// This is the announcement recieved from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct { type dynamicAnnouncement struct {
Relays []relayAnnouncement `json:"relays"` Relays []struct {
} URL string
}
type relayAnnouncement struct {
URL string `json:"url"`
} }