diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index fdb729751..e7dbb8e8f 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -27,6 +27,10 @@ "ImportPath": "github.com/codegangsta/martini", "Comment": "v0.1-142-g8659df7", "Rev": "8659df7a51aebe6c6120268cd5a8b4c34fa8441a" + }, + { + "ImportPath": "github.com/juju/ratelimit", + "Rev": "cbaa435c80a9716e086f25d409344b26c4039358" } ] } diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE new file mode 100644 index 000000000..53320c352 --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE @@ -0,0 +1,185 @@ +This software is licensed under the LGPLv3, included below. + +As a special exception to the GNU Lesser General Public License version 3 +("LGPL3"), the copyright holders of this Library give you permission to +convey to a third party a Combined Work that links statically or dynamically +to this Library without providing any Minimal Corresponding Source or +Minimal Application Code as set out in 4d or providing the installation +information set out in section 4e, provided that you comply with the other +provisions of LGPL3 and provided that you meet, for the Application the +terms and conditions of the license(s) which apply to the Application. + +Except as stated in this special exception, the provisions of LGPL3 will +continue to comply in full to this Library. If you modify this Library, you +may apply this exception to your version of this Library, but you are not +obliged to do so. If you do not wish to do so, delete this exception +statement from your version. This exception does not (and cannot) modify any +license terms which apply to the Application, with which you must still +comply. + + + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/README.md b/Godeps/_workspace/src/github.com/juju/ratelimit/README.md new file mode 100644 index 000000000..270ee914c --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/README.md @@ -0,0 +1,109 @@ +# ratelimit +-- + import "github.com/juju/ratelimit" + +The ratelimit package provides an efficient token bucket implementation. See +http://en.wikipedia.org/wiki/Token_bucket. + +## Usage + +#### func Reader + +```go +func Reader(r io.Reader, bucket *Bucket) io.Reader +``` +Reader returns a reader that is rate limited by the given token bucket. Each +token in the bucket represents one byte. + +#### func Writer + +```go +func Writer(w io.Writer, bucket *Bucket) io.Writer +``` +Writer returns a reader that is rate limited by the given token bucket. Each +token in the bucket represents one byte. + +#### type Bucket + +```go +type Bucket struct { +} +``` + +Bucket represents a token bucket that fills at a predetermined rate. Methods on +Bucket may be called concurrently. + +#### func NewBucket + +```go +func NewBucket(fillInterval time.Duration, capacity int64) *Bucket +``` +NewBucket returns a new token bucket that fills at the rate of one token every +fillInterval, up to the given maximum capacity. Both arguments must be positive. +The bucket is initially full. + +#### func NewBucketWithRate + +```go +func NewBucketWithRate(rate float64, capacity int64) *Bucket +``` +NewBucketWithRate returns a token bucket that fills the bucket at the rate of +rate tokens per second up to the given maximum capacity. Because of limited +clock resolution, at high rates, the actual rate may be up to 1% different from +the specified rate. + +#### func (*Bucket) Rate + +```go +func (tb *Bucket) Rate() float64 +``` +Rate returns the fill rate of the bucket, in tokens per second. + +#### func (*Bucket) Take + +```go +func (tb *Bucket) Take(count int64) time.Duration +``` +Take takes count tokens from the bucket without blocking. It returns the time +that the caller should wait until the tokens are actually available. + +Note that if the request is irrevocable - there is no way to return tokens to +the bucket once this method commits us to taking them. + +#### func (*Bucket) TakeAvailable + +```go +func (tb *Bucket) TakeAvailable(count int64) int64 +``` +TakeAvailable takes up to count immediately available tokens from the bucket. It +returns the number of tokens removed, or zero if there are no available tokens. +It does not block. + +#### func (*Bucket) TakeMaxDuration + +```go +func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) +``` +TakeMaxDuration is like Take, except that it will only take tokens from the +bucket if the wait time for the tokens is no greater than maxWait. + +If it would take longer than maxWait for the tokens to become available, it does +nothing and reports false, otherwise it returns the time that the caller should +wait until the tokens are actually available, and reports true. + +#### func (*Bucket) Wait + +```go +func (tb *Bucket) Wait(count int64) +``` +Wait takes count tokens from the bucket, waiting until they are available. + +#### func (*Bucket) WaitMaxDuration + +```go +func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool +``` +WaitMaxDuration is like Wait except that it will only take tokens from the +bucket if it needs to wait for no greater than maxWait. It reports whether any +tokens have been removed from the bucket If no tokens have been removed, it +returns immediately. diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go new file mode 100644 index 000000000..4b42926a1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go @@ -0,0 +1,227 @@ +// Copyright 2014 Canonical Ltd. +// Licensed under the LGPLv3 with static-linking exception. +// See LICENCE file for details. + +// The ratelimit package provides an efficient token bucket implementation. +// See http://en.wikipedia.org/wiki/Token_bucket. +package ratelimit + +import ( + "strconv" + "sync" + "time" +) + +// Bucket represents a token bucket that fills at a predetermined rate. +// Methods on Bucket may be called concurrently. +type Bucket struct { + startTime time.Time + capacity int64 + quantum int64 + fillInterval time.Duration + + // The mutex guards the fields following it. + mu sync.Mutex + + // avail holds the number of available tokens + // in the bucket, as of availTick ticks from startTime. + // It will be negative when there are consumers + // waiting for tokens. + avail int64 + availTick int64 +} + +// NewBucket returns a new token bucket that fills at the +// rate of one token every fillInterval, up to the given +// maximum capacity. Both arguments must be +// positive. The bucket is initially full. +func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { + return newBucketWithQuantum(fillInterval, capacity, 1) +} + +// rateMargin specifes the allowed variance of actual +// rate from specified rate. 1% seems reasonable. +const rateMargin = 0.01 + +// NewBucketWithRate returns a token bucket that fills the bucket +// at the rate of rate tokens per second up to the given +// maximum capacity. Because of limited clock resolution, +// at high rates, the actual rate may be up to 1% different from the +// specified rate. +func NewBucketWithRate(rate float64, capacity int64) *Bucket { + for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) { + fillInterval := time.Duration(1e9 * float64(quantum) / rate) + if fillInterval <= 0 { + continue + } + tb := newBucketWithQuantum(fillInterval, capacity, quantum) + if diff := abs(tb.Rate() - rate); diff/rate <= rateMargin { + return tb + } + } + panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64)) +} + +// nextQuantum returns the next quantum to try after q. +// We grow the quantum exponentially, but slowly, so we +// get a good fit in the lower numbers. +func nextQuantum(q int64) int64 { + q1 := q * 11 / 10 + if q1 == q { + q1++ + } + return q1 +} + +// newBucketWithQuantum is similar to NewBucket, but allows +// the specification of the quantum size - quantum tokens +// are added every fillInterval. This is so that we can get accurate +// rates even when we want to add more than one token per ns. +func newBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket { + if fillInterval <= 0 { + panic("token bucket fill interval is not > 0") + } + if capacity <= 0 { + panic("token bucket capacity is not > 0") + } + if quantum <= 0 { + panic("token bucket quantum is not > 0") + } + return &Bucket{ + startTime: time.Now(), + capacity: capacity, + quantum: quantum, + avail: capacity, + fillInterval: fillInterval, + } +} + +// Wait takes count tokens from the bucket, waiting until they are +// available. +func (tb *Bucket) Wait(count int64) { + if d := tb.Take(count); d > 0 { + time.Sleep(d) + } +} + +// WaitMaxDuration is like Wait except that it will +// only take tokens from the bucket if it needs to wait +// for no greater than maxWait. It reports whether +// any tokens have been removed from the bucket +// If no tokens have been removed, it returns immediately. +func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool { + d, ok := tb.TakeMaxDuration(count, maxWait) + if d > 0 { + time.Sleep(d) + } + return ok +} + +const infinityDuration time.Duration = 0x7fffffffffffffff + +// Take takes count tokens from the bucket without blocking. It returns +// the time that the caller should wait until the tokens are actually +// available. +// +// Note that if the request is irrevocable - there is no way to return +// tokens to the bucket once this method commits us to taking them. +func (tb *Bucket) Take(count int64) time.Duration { + d, _ := tb.take(time.Now(), count, infinityDuration) + return d +} + +// TakeMaxDuration is like Take, except that +// it will only take tokens from the bucket if the wait +// time for the tokens is no greater than maxWait. +// +// If it would take longer than maxWait for the tokens +// to become available, it does nothing and reports false, +// otherwise it returns the time that the caller should +// wait until the tokens are actually available, and reports +// true. +func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) { + return tb.take(time.Now(), count, maxWait) +} + +// TakeAvailable takes up to count immediately available tokens from the +// bucket. It returns the number of tokens removed, or zero if there are +// no available tokens. It does not block. +func (tb *Bucket) TakeAvailable(count int64) int64 { + return tb.takeAvailable(time.Now(), count) +} + +// takeAvailable is the internal version of TakeAvailable - it takes the +// current time as an argument to enable easy testing. +func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { + if count <= 0 { + return 0 + } + tb.mu.Lock() + defer tb.mu.Unlock() + + tb.adjust(now) + if tb.avail <= 0 { + return 0 + } + if count > tb.avail { + count = tb.avail + } + tb.avail -= count + return count +} + +// Rate returns the fill rate of the bucket, in tokens per second. +func (tb *Bucket) Rate() float64 { + return 1e9 * float64(tb.quantum) / float64(tb.fillInterval) +} + +// take is the internal version of Take - it takes the current time as +// an argument to enable easy testing. +func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { + if count <= 0 { + return 0, true + } + tb.mu.Lock() + defer tb.mu.Unlock() + + currentTick := tb.adjust(now) + avail := tb.avail - count + if avail >= 0 { + tb.avail = avail + return 0, true + } + // Round up the missing tokens to the nearest multiple + // of quantum - the tokens won't be available until + // that tick. + endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum + endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) + waitTime := endTime.Sub(now) + if waitTime > maxWait { + return 0, false + } + tb.avail = avail + return waitTime, true +} + +// adjust adjusts the current bucket capacity based on the current time. +// It returns the current tick. +func (tb *Bucket) adjust(now time.Time) (currentTick int64) { + currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval) + + if tb.avail >= tb.capacity { + return + } + tb.avail += (currentTick - tb.availTick) * tb.quantum + if tb.avail > tb.capacity { + tb.avail = tb.capacity + } + tb.availTick = currentTick + return +} + +func abs(f float64) float64 { + if f < 0 { + return -f + } + return f +} diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go new file mode 100644 index 000000000..48dbfe0cc --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go @@ -0,0 +1,328 @@ +// Copyright 2014 Canonical Ltd. +// Licensed under the LGPLv3 with static-linking exception. +// See LICENCE file for details. + +package ratelimit + +import ( + gc "launchpad.net/gocheck" + + "testing" + "time" +) + +func TestPackage(t *testing.T) { + gc.TestingT(t) +} + +type rateLimitSuite struct{} + +var _ = gc.Suite(rateLimitSuite{}) + +type takeReq struct { + time time.Duration + count int64 + expectWait time.Duration +} + +var takeTests = []struct { + about string + fillInterval time.Duration + capacity int64 + reqs []takeReq +}{{ + about: "serial requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 0, + expectWait: 0, + }, { + time: 0, + count: 10, + expectWait: 0, + }, { + time: 0, + count: 1, + expectWait: 250 * time.Millisecond, + }, { + time: 250 * time.Millisecond, + count: 1, + expectWait: 250 * time.Millisecond, + }}, +}, { + about: "concurrent requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 0, + count: 2, + expectWait: 500 * time.Millisecond, + }, { + time: 0, + count: 2, + expectWait: 1000 * time.Millisecond, + }, { + time: 0, + count: 1, + expectWait: 1250 * time.Millisecond, + }}, +}, { + about: "more than capacity", + fillInterval: 1 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 20 * time.Millisecond, + count: 15, + expectWait: 5 * time.Millisecond, + }}, +}, { + about: "sub-quantum time", + fillInterval: 10 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 7 * time.Millisecond, + count: 1, + expectWait: 3 * time.Millisecond, + }, { + time: 8 * time.Millisecond, + count: 1, + expectWait: 12 * time.Millisecond, + }}, +}, { + about: "within capacity", + fillInterval: 10 * time.Millisecond, + capacity: 5, + reqs: []takeReq{{ + time: 0, + count: 5, + expectWait: 0, + }, { + time: 60 * time.Millisecond, + count: 5, + expectWait: 0, + }, { + time: 60 * time.Millisecond, + count: 1, + expectWait: 10 * time.Millisecond, + }, { + time: 80 * time.Millisecond, + count: 2, + expectWait: 10 * time.Millisecond, + }}, +}} + +func (rateLimitSuite) TestTake(c *gc.C) { + for i, test := range takeTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + d, ok := tb.take(tb.startTime.Add(req.time), req.count, infinityDuration) + c.Assert(ok, gc.Equals, true) + if d != req.expectWait { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) + } + } + } +} + +func (rateLimitSuite) TestTakeMaxDuration(c *gc.C) { + for i, test := range takeTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + if req.expectWait > 0 { + d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait-1) + c.Assert(ok, gc.Equals, false) + c.Assert(d, gc.Equals, time.Duration(0)) + } + d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait) + c.Assert(ok, gc.Equals, true) + if d != req.expectWait { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) + } + } + } +} + +type takeAvailableReq struct { + time time.Duration + count int64 + expect int64 +} + +var takeAvailableTests = []struct { + about string + fillInterval time.Duration + capacity int64 + reqs []takeAvailableReq +}{{ + about: "serial requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 0, + expect: 0, + }, { + time: 0, + count: 10, + expect: 10, + }, { + time: 0, + count: 1, + expect: 0, + }, { + time: 250 * time.Millisecond, + count: 1, + expect: 1, + }}, +}, { + about: "concurrent requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 5, + expect: 5, + }, { + time: 0, + count: 2, + expect: 2, + }, { + time: 0, + count: 5, + expect: 3, + }, { + time: 0, + count: 1, + expect: 0, + }}, +}, { + about: "more than capacity", + fillInterval: 1 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 10, + expect: 10, + }, { + time: 20 * time.Millisecond, + count: 15, + expect: 10, + }}, +}, { + about: "within capacity", + fillInterval: 10 * time.Millisecond, + capacity: 5, + reqs: []takeAvailableReq{{ + time: 0, + count: 5, + expect: 5, + }, { + time: 60 * time.Millisecond, + count: 5, + expect: 5, + }, { + time: 70 * time.Millisecond, + count: 1, + expect: 1, + }}, +}} + +func (rateLimitSuite) TestTakeAvailable(c *gc.C) { + for i, test := range takeAvailableTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + d := tb.takeAvailable(tb.startTime.Add(req.time), req.count) + if d != req.expect { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expect) + } + } + } +} + +func (rateLimitSuite) TestPanics(c *gc.C) { + c.Assert(func() { NewBucket(0, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") + c.Assert(func() { NewBucket(-2, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") + c.Assert(func() { NewBucket(1, 0) }, gc.PanicMatches, "token bucket capacity is not > 0") + c.Assert(func() { NewBucket(1, -2) }, gc.PanicMatches, "token bucket capacity is not > 0") +} + +func isCloseTo(x, y, tolerance float64) bool { + return abs(x-y)/y < tolerance +} + +func (rateLimitSuite) TestRate(c *gc.C) { + tb := NewBucket(1, 1) + if !isCloseTo(tb.Rate(), 1e9, 0.00001) { + c.Fatalf("got %v want 1e9", tb.Rate()) + } + tb = NewBucket(2*time.Second, 1) + if !isCloseTo(tb.Rate(), 0.5, 0.00001) { + c.Fatalf("got %v want 0.5", tb.Rate()) + } + tb = newBucketWithQuantum(100*time.Millisecond, 1, 5) + if !isCloseTo(tb.Rate(), 50, 0.00001) { + c.Fatalf("got %v want 50", tb.Rate()) + } +} + +func checkRate(c *gc.C, rate float64) { + tb := NewBucketWithRate(rate, 1<<62) + if !isCloseTo(tb.Rate(), rate, rateMargin) { + c.Fatalf("got %g want %v", tb.Rate(), rate) + } + d, ok := tb.take(tb.startTime, 1<<62, infinityDuration) + c.Assert(ok, gc.Equals, true) + c.Assert(d, gc.Equals, time.Duration(0)) + + // Check that the actual rate is as expected by + // asking for a not-quite multiple of the bucket's + // quantum and checking that the wait time + // correct. + d, ok = tb.take(tb.startTime, tb.quantum*2-tb.quantum/2, infinityDuration) + c.Assert(ok, gc.Equals, true) + expectTime := 1e9 * float64(tb.quantum) * 2 / rate + if !isCloseTo(float64(d), expectTime, rateMargin) { + c.Fatalf("rate %g: got %g want %v", rate, float64(d), expectTime) + } +} + +func (rateLimitSuite) TestNewWithRate(c *gc.C) { + for rate := float64(1); rate < 1e6; rate += 7 { + checkRate(c, rate) + } + for _, rate := range []float64{ + 1024 * 1024 * 1024, + 1e-5, + 0.9e-5, + 0.5, + 0.9, + 0.9e8, + 3e12, + 4e18, + } { + checkRate(c, rate) + checkRate(c, rate/3) + checkRate(c, rate*1.3) + } +} + +func BenchmarkWait(b *testing.B) { + tb := NewBucket(1, 16*1024) + for i := b.N - 1; i >= 0; i-- { + tb.Wait(1) + } +} diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/reader.go b/Godeps/_workspace/src/github.com/juju/ratelimit/reader.go new file mode 100644 index 000000000..6403bf78d --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/reader.go @@ -0,0 +1,51 @@ +// Copyright 2014 Canonical Ltd. +// Licensed under the LGPLv3 with static-linking exception. +// See LICENCE file for details. + +package ratelimit + +import "io" + +type reader struct { + r io.Reader + bucket *Bucket +} + +// Reader returns a reader that is rate limited by +// the given token bucket. Each token in the bucket +// represents one byte. +func Reader(r io.Reader, bucket *Bucket) io.Reader { + return &reader{ + r: r, + bucket: bucket, + } +} + +func (r *reader) Read(buf []byte) (int, error) { + n, err := r.r.Read(buf) + if n <= 0 { + return n, err + } + r.bucket.Wait(int64(n)) + return n, err +} + +type writer struct { + w io.Writer + bucket *Bucket +} + +// Writer returns a reader that is rate limited by +// the given token bucket. Each token in the bucket +// represents one byte. +func Writer(w io.Writer, bucket *Bucket) io.Writer { + return &writer{ + w: w, + bucket: bucket, + } +} + +func (w *writer) Write(buf []byte) (int, error) { + w.bucket.Wait(int64(len(buf))) + return w.w.Write(buf) +} diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 0e1d61b04..3fc3096c2 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -18,6 +18,7 @@ import ( "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/scanner" + "github.com/juju/ratelimit" ) type Model struct { @@ -35,7 +36,7 @@ type Model struct { sup suppressor - limitRequestRate chan struct{} + limitRequestRate *ratelimit.Bucket addedRepo bool started bool @@ -66,18 +67,7 @@ func NewModel(maxChangeBw int) *Model { } func (m *Model) LimitRate(kbps int) { - m.limitRequestRate = make(chan struct{}, kbps) - n := kbps/10 + 1 - go func() { - for { - time.Sleep(100 * time.Millisecond) - for i := 0; i < n; i++ { - select { - case m.limitRequestRate <- struct{}{}: - } - } - } - }() + m.limitRequestRate = ratelimit.NewBucketWithRate(float64(kbps), int64(5*kbps)) } // StartRW starts read/write processing on the current model. When in @@ -362,9 +352,7 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by } if m.limitRequestRate != nil { - for s := 0; s < len(buf); s += 1024 { - <-m.limitRequestRate - } + m.limitRequestRate.Wait(int64(size / 1024)) } return buf, nil