Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kazeburo committed Nov 13, 2018
0 parents commit c4c39f1
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# sabo
64 changes: 64 additions & 0 deletions sabo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"

humanize "github.com/dustin/go-humanize"
flags "github.com/jessevdk/go-flags"
"github.com/kazeburo/sabo/saboreader"
)

// Version set in compile
var Version string

type cmdOpts struct {
MaxBandWidth string `long:"max-bandwidth" description:"max bandwidth (Bytes/sec)" required:"true"`
WorkDir string `long:"work-dir" description:"directory for control bandwidth" required:"true"`
Version bool `short:"v" long:"version" description:"Show version"`
}

func main() {
opts := cmdOpts{}
psr := flags.NewParser(&opts, flags.Default)
_, err := psr.Parse()
if err != nil {
os.Exit(1)
}

if opts.Version {
fmt.Printf(`motarei %s
Compiler: %s %s
`,
Version,
runtime.Compiler,
runtime.Version())
return

}

bw, err := humanize.ParseBytes(opts.MaxBandWidth)
if err != nil {
fmt.Println("Cannot parse -max-bandwidth", err)
os.Exit(1)
}

ctx := context.Background()
reader, err := saboreader.NewReaderWithContext(ctx, os.Stdin, filepath.Clean(opts.WorkDir), bw)
if err != nil {
log.Fatalf("Cannot create reader:%s", err)
}
defer reader.CleanUp()
err = reader.RefreshLimiter(ctx)
if err != nil {
log.Fatalf("Cannot create initial bandwidth:%s", err)
}
reader.RunRefresh(ctx)

io.Copy(os.Stdout, reader)
}
152 changes: 152 additions & 0 deletions saboreader/saboreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package saboreader

import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"

"golang.org/x/time/rate"
)

const burstLimit = 1000 * 1000 * 1000

const (
refreshInterval = 1
)

// Reader reader with limiter
type Reader struct {
r io.Reader
limiter *rate.Limiter
ctx context.Context
mu *sync.RWMutex
lf *os.File
wd string
bw uint64
}

// NewReaderWithContext returns a reader that implements io.Reader with rate limiting.
func NewReaderWithContext(ctx context.Context, r io.Reader, workDir string, bw uint64) (*Reader, error) {
_, err := ioutil.ReadDir(workDir)
if err != nil {
return nil, fmt.Errorf("Cannot open workdir: %v", err)
}
lockfile := fmt.Sprintf("_sabo_%d_%d.lock", bw, os.Getpid())
file, err := os.OpenFile(filepath.Join(workDir, lockfile), syscall.O_RDWR|syscall.O_CREAT, 0600)
if err != nil {
return nil, fmt.Errorf("Cannot create lockfile in workdir: %v", err)
}

err = syscall.Flock(int(file.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
return nil, fmt.Errorf("Cannot lock lockfile in workdir: %v", err)
}
err = os.Rename(filepath.Join(workDir, lockfile), filepath.Join(workDir, fmt.Sprintf("sabo_%d_%d.lock", bw, os.Getpid())))
return &Reader{
r: r,
ctx: ctx,
mu: new(sync.RWMutex),
lf: file,
wd: workDir,
bw: bw,
}, nil
}

// CleanUp clean up lockfile
func (s *Reader) CleanUp() {
defer os.Remove(filepath.Join(s.wd, s.lf.Name()))
s.lf.Close()
}

func (s *Reader) getRateLimit() *rate.Limiter {
s.mu.Lock()
defer s.mu.Unlock()
limiter := s.limiter
return limiter
}

// Read reads bytes into p.
func (s *Reader) Read(p []byte) (int, error) {
limiter := s.getRateLimit()
if limiter == nil {
return s.r.Read(p)
}
n, err := s.r.Read(p)
if err != nil {
return n, err
}

if err := limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, nil
}

// RefreshLimiter refresh limiter
func (s *Reader) RefreshLimiter(ctx context.Context) error {
files, err := ioutil.ReadDir(s.wd)
if err != nil {
return fmt.Errorf("Cannot open workdir: %v", err)
}
locked := uint64(0)
for _, file := range files {
if file.IsDir() {
continue
}

if m, _ := regexp.MatchString(fmt.Sprintf("^sabo_%d", s.bw), file.Name()); !m {
continue
}

lfile, err := os.OpenFile(filepath.Join(s.wd, file.Name()), syscall.O_RDONLY, 0600)
if err != nil {
continue
}
err = syscall.Flock(int(lfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
// fmt.Fprintf(os.Stderr, "= %s => %v\n", lfile.Name(), err)
if err != nil {
locked = locked + 1
lfile.Close()
} else {
lfile.Close()
os.Remove(filepath.Join(s.wd, file.Name()))
}

}
bytesPerSec := float64(s.bw)
if locked > 0 {
bytesPerSec = float64(s.bw / locked)
}
fmt.Fprintf(os.Stderr, "new limit %f\n", bytesPerSec)
limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
limiter.AllowN(time.Now(), burstLimit) // spend initial burst
s.mu.Lock()
defer s.mu.Unlock()
s.limiter = limiter
return nil
}

// RunRefresh refresh limiter regularly
func (s *Reader) RunRefresh(ctx context.Context) {
ticker := time.NewTicker(refreshInterval * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case _ = <-ticker.C:
err := s.RefreshLimiter(ctx)
if err != nil {
log.Printf("Regularly refresh limiter failed:%v", err)
}
}
}
}

0 comments on commit c4c39f1

Please sign in to comment.