Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
Merge branch 'stable'
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Sep 9, 2023
2 parents 3ca4d8d + dde477f commit fe2e33e
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 102 deletions.
222 changes: 154 additions & 68 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package downloader

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
Expand All @@ -33,10 +34,10 @@ import (
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ type AggStats struct {
Progress float32

BytesCompleted, BytesTotal uint64
DroppedCompleted, DroppedTotal uint64
DroppedCompleted, DroppedTotal atomic.Uint64

BytesDownload, BytesUpload uint64
UploadRate, DownloadRate uint64
Expand Down Expand Up @@ -115,7 +116,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) {

statsLock: &sync.RWMutex{},
}
if err := d.addSegments(); err != nil {
if err := d.addSegments(ctx); err != nil {
return nil, err
}
return d, nil
Expand All @@ -126,76 +127,110 @@ func (d *Downloader) MainLoopInBackground(ctx context.Context, silent bool) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.mainLoop(ctx, silent)
if err := d.mainLoop(ctx, silent); err != nil {
if !errors.Is(err, context.Canceled) {
log.Warn("[snapshots]", "err", err)
}
}
}()
}

func (d *Downloader) mainLoop(ctx context.Context, silent bool) {
func (d *Downloader) mainLoop(ctx context.Context, silent bool) error {
var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots))

d.wg.Add(1)
go func() {
defer d.wg.Done()

// Torrents that are already taken care of
torrentMap := map[metainfo.Hash]struct{}{}
// First loop drops torrents that were downloaded or are already complete
// This improves efficiency of download by reducing number of active torrent (empirical observation)
for torrents := d.Torrent().Torrents(); len(torrents) > 0; torrents = d.Torrent().Torrents() {
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
<-t.GotInfo()
if t.Complete.Bool() {
atomic.AddUint64(&d.stats.DroppedCompleted, uint64(t.BytesCompleted()))
atomic.AddUint64(&d.stats.DroppedTotal, uint64(t.Length()))
t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
DownloadLoop:
torrents := d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
if t.Complete.Bool() {
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
go func(t *torrent.Torrent) {
defer sem.Release(1)
<-t.Complete.On()
atomic.AddUint64(&d.stats.DroppedCompleted, uint64(t.BytesCompleted()))
atomic.AddUint64(&d.stats.DroppedTotal, uint64(t.Length()))
t.Drop()
}(t)
continue
}
}
atomic.StoreUint64(&d.stats.DroppedCompleted, 0)
atomic.StoreUint64(&d.stats.DroppedTotal, 0)
d.addSegments()
maps.Clear(torrentMap)
for {
torrents := d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
<-t.GotInfo()
if t.Complete.Bool() {
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
defer sem.Release(1)
select {
case <-ctx.Done():
return
case <-t.Complete.On():
}
t.AllowDataDownload()
t.DownloadAll()
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
}(t)
}
if len(torrents) != len(d.Torrent().Torrents()) { //if amount of torrents changed - keep downloading
goto DownloadLoop
}

if err := d.addSegments(ctx); err != nil {
return
}
DownloadLoop2:
torrents = d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
if t.Complete.Bool() {
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
go func(t *torrent.Torrent) {
defer sem.Release(1)
//r := t.NewReader()
//r.SetReadahead(t.Length())
//_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download
<-t.Complete.On()
}(t)
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
time.Sleep(30 * time.Second)
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
defer sem.Release(1)
select {
case <-ctx.Done():
return
case <-t.Complete.On():
}
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
}(t)
}
if len(torrents) != len(d.Torrent().Torrents()) { //if amount of torrents changed - keep downloading
goto DownloadLoop2
}
}()

Expand All @@ -210,7 +245,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-statEvery.C:
d.ReCalcStats(statInterval)

Expand Down Expand Up @@ -274,7 +309,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
stats.BytesDownload = uint64(connStats.BytesReadUsefulIntendedData.Int64())
stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64())

stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = atomic.LoadUint64(&stats.DroppedTotal), atomic.LoadUint64(&stats.DroppedCompleted), 0, 0
stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = stats.DroppedTotal.Load(), stats.DroppedCompleted.Load(), 0, 0
for _, t := range torrents {
select {
case <-t.GotInfo():
Expand Down Expand Up @@ -388,7 +423,9 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
logInterval := 20 * time.Second
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
d.wg.Add(1)
go func() {
defer d.wg.Done()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -417,7 +454,47 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil })
}

func (d *Downloader) addSegments() error {
func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
}
infoHash := Proto2InfoHash(hash)
//log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)

if _, ok := d.torrentClient.Torrent(infoHash); ok {
//log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
t, err := d.torrentClient.AddMagnet(magnet.String())
if err != nil {
//log.Warn("[downloader] add magnet link", "err", err)
return false, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(snapDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(t)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}

func (d *Downloader) addSegments(ctx context.Context) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
_, err := BuildTorrentFilesIfNeed(context.Background(), d.SnapDir())
Expand All @@ -433,27 +510,36 @@ func (d *Downloader) addSegments() error {
return fmt.Errorf("seedableHistorySnapshots: %w", err)
}
files = append(files, files2...)
wg := &sync.WaitGroup{}

g, ctx := errgroup.WithContext(ctx)
i := atomic.Int64{}
for _, f := range files {
wg.Add(1)
go func(f string) {
defer wg.Done()
f := f
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := AddSegment(f, d.cfg.DataDir, d.torrentClient)
if err != nil {
log.Warn("[snapshots] AddSegment", "err", err)
return
return err
}

i.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
log.Info("[snpshots] initializing", "files", fmt.Sprintf("%d/%d", i.Load(), len(files)))
default:
}
}(f)
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
wg.Wait()
return nil
}

Expand Down
37 changes: 3 additions & 34 deletions downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
snapDir := s.d.SnapDir()
for i, it := range request.Items {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-logEvery.C:
log.Info("[snapshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items)))
default:
Expand All @@ -72,7 +74,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}

_, err := createMagnetLinkWithInfoHash(it.TorrentHash, torrentClient, snapDir)
_, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, snapDir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,36 +139,3 @@ func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.C
}

// we dont have .seg or .torrent so we get them through the torrent hash
func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent.Client, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
}
infoHash := Proto2InfoHash(hash)
//log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)

if _, ok := torrentClient.Torrent(infoHash); ok {
//log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
t, err := torrentClient.AddMagnet(magnet.String())
if err != nil {
//log.Warn("[downloader] add magnet link", "err", err)
return false, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
go func(t *torrent.Torrent) {
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(snapDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(t)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}

0 comments on commit fe2e33e

Please sign in to comment.