Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base cmm2 #147

Merged
merged 4 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
DefaultRpcAddr2 = "wss://testnet-rpc1.cess.cloud/ws/"
//
DefaultBootNodeAddr = "_dnsaddr.boot-kldr-testnet.cess.cloud"
//
DefaultDeossAddr = "https://deoss-pub-gateway.cess.cloud/"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion configs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// Name is the name of the program
Name = "bucket"
// version
Version = "v0.6.3 pre-release 230718.2254"
Version = "v0.6.3 pre-release 230720.1730"
// Description is the description of the program
Description = "Storage node implementation in CESS networks"
// NameSpace is the cached namespace
Expand Down
20 changes: 10 additions & 10 deletions node/challengeMgt.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,28 @@ func (n *Node) proofAssignedInfo(ihash, shash []byte, randomIndexList []uint32,
qslice[k].I = uint64(v)
qslice[k].V = random[k]
}

sign, err := n.Sign(n.GetPeerPublickey())
if err != nil {
return "", code, errors.Wrapf(err, "[Sign]")
}

if !n.HasPeer(peerid.Pretty()) {
addr, err := n.DHTFindPeer(peerid.Pretty())
addr, ok := n.GetPeer(peerid.Pretty())
if !ok {
addr, err = n.DHTFindPeer(peerid.Pretty())
if err != nil {
return "", code, fmt.Errorf("No verification proof tee found: %s", peerid.Pretty())
}
err = n.Connect(n.GetCtxQueryFromCtxCancel(), addr)
if err != nil {
return "", code, fmt.Errorf("Failed to connect to verification proof tee: %s", peerid.Pretty())
}
}

err = n.Connect(n.GetCtxQueryFromCtxCancel(), addr)
if err != nil {
return "", code, fmt.Errorf("Failed to connect to verification proof tee: %s", peerid.Pretty())
}

code, err = n.AggrProofReq(peerid, ihash, shash, qslice, n.GetStakingPublickey(), sign)
if err != nil || code != 0 {
n.Chal("err", fmt.Sprintf("AggrProofReq err: %v, code: %d", err, code))
return "", code, errors.New(fmt.Sprintf("AggrProofReq err: %v, code: %d", err, code))
return "", code, errors.New(fmt.Sprintf("AggrProofReq to %s err: %v, code: %d", peerid.Pretty(), err, code))

}
n.Chal("info", fmt.Sprintf("Aggr proof response suc: %s", peerid.Pretty()))
Expand All @@ -348,14 +350,12 @@ func (n *Node) proofAssignedInfo(ihash, shash []byte, randomIndexList []uint32,

code, err = n.FileReq(peerid, idleProofFileHashs, pb.FileType_IdleMu, n.GetDirs().IproofFile)
if err != nil || code != 0 {
n.Chal("err", fmt.Sprintf("FileReq FileType_IdleMu err: %v,code: %d", err, code))
return "", code, errors.New(fmt.Sprintf("FileReq FileType_IdleMu err: %v,code: %d", err, code))
}
n.Chal("info", fmt.Sprintf("Aggr proof idle file response suc: %s", peerid.Pretty()))

code, err = n.FileReq(peerid, serviceProofFileHashs, pb.FileType_CustomMu, n.GetDirs().SproofFile)
if err != nil || code != 0 {
n.Chal("err", fmt.Sprintf("FileReq FileType_IdleMu err: %v,code: %d", err, code))
return peerid.Pretty(), code, errors.New(fmt.Sprintf("FileReq FileType_IdleMu err: %v,code: %d", err, code))
}

Expand Down
46 changes: 40 additions & 6 deletions node/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
package node

import (
"encoding/json"
"fmt"
"time"

"github.com/CESSProject/cess-bucket/configs"
"github.com/CESSProject/cess-bucket/pkg/utils"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"golang.org/x/time/rate"
)

Expand All @@ -28,8 +32,22 @@ func (n *Node) discoverMgt(ch chan bool) {
if err != nil {
n.Discover("err", err.Error())
}
data, err := utils.QueryPeers(configs.DefaultDeossAddr)
if err != nil {
n.Discover("err", err.Error())
} else {
err = json.Unmarshal(data, &n.peers)
if err != nil {
n.Discover("err", err.Error())
} else {
err = n.SavePeersToDisk(n.peersPath)
if err != nil {
n.Discover("err", err.Error())
}
}
}

tickDiscover := time.NewTicker(time.Minute * 5)
tickDiscover := time.NewTicker(time.Minute * 10)
defer tickDiscover.Stop()

var r1 = rate.Every(time.Second * 5)
Expand All @@ -41,19 +59,35 @@ func (n *Node) discoverMgt(ch chan bool) {

for {
select {
case peer, _ := <-n.GetDiscoveredPeers():
case discoveredPeer, _ := <-n.GetDiscoveredPeers():
if limit.Allow() {
n.Discover("info", "reset")
tickDiscover.Reset(time.Minute * 5)
tickDiscover.Reset(time.Minute * 10)
}
if len(peer.Responses) == 0 {
if len(discoveredPeer.Responses) == 0 {
break
}
for _, v := range peer.Responses {
n.SavePeer(v.ID.Pretty(), *v)
for _, v := range discoveredPeer.Responses {
var addrInfo peer.AddrInfo
var addrs []multiaddr.Multiaddr
for _, addr := range v.Addrs {
if ipv4, ok := utils.FildIpv4([]byte(addr.String())); ok {
if ok, err := utils.IsIntranetIpv4(ipv4); err == nil {
if !ok {
addrs = append(addrs, addr)
}
}
}
}
if len(addrs) > 0 {
addrInfo.ID = v.ID
addrInfo.Addrs = utils.RemoveRepeatedAddr(addrs)
n.SavePeer(v.ID.Pretty(), addrInfo)
}
}
case <-tickDiscover.C:
if printLimit.Allow() {
n.RemovePeerIntranetAddr()
err = n.SavePeersToDisk(n.peersPath)
if err != nil {
n.Discover("err", err.Error())
Expand Down
31 changes: 31 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"github.com/CESSProject/cess-bucket/pkg/confile"
"github.com/CESSProject/cess-bucket/pkg/logger"
"github.com/CESSProject/cess-bucket/pkg/proof"
"github.com/CESSProject/cess-bucket/pkg/utils"
"github.com/CESSProject/cess-go-sdk/core/pattern"
"github.com/CESSProject/cess-go-sdk/core/sdk"
sutils "github.com/CESSProject/cess-go-sdk/core/utils"
"github.com/CESSProject/p2p-go/out"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

type Node struct {
Expand Down Expand Up @@ -141,6 +143,10 @@ func (n *Node) SavePeer(peerid string, addr peer.AddrInfo) {
}
}

func (n *Node) SaveOrUpdatePeerUnSafe(peerid string, addr peer.AddrInfo) {
n.peers[peerid] = addr
}

func (n *Node) HasPeer(peerid string) bool {
n.peerLock.RLock()
_, ok := n.peers[peerid]
Expand All @@ -167,6 +173,31 @@ func (n *Node) GetAllPeerId() []string {
return result
}

func (n *Node) RemovePeerIntranetAddr() {
n.peerLock.Lock()
defer n.peerLock.Unlock()
for k, v := range n.peers {
var addrInfo peer.AddrInfo
var addrs []multiaddr.Multiaddr
for _, addr := range v.Addrs {
if ipv4, ok := utils.FildIpv4([]byte(addr.String())); ok {
if ok, err := utils.IsIntranetIpv4(ipv4); err == nil {
if !ok {
addrs = append(addrs, addr)
}
}
}
}
if len(addrs) > 0 {
addrInfo.ID = v.ID
addrInfo.Addrs = utils.RemoveRepeatedAddr(addrs)
n.SaveOrUpdatePeerUnSafe(v.ID.Pretty(), addrInfo)
} else {
delete(n.peers, k)
}
}
}

func (n *Node) SavePeersToDisk(path string) error {
n.peerLock.RLock()
buf, err := json.Marshal(n.peers)
Expand Down
42 changes: 31 additions & 11 deletions node/restoreMgt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,18 @@ func (n *Node) restoreMgt(ch chan bool) {
n.Restore("info", ">>>>> start restoreMgt <<<<<")
for {
for n.GetChainState() {
err := n.inspector()
time.Sleep(time.Minute)
minerInfo, err := n.QueryStorageMiner(n.GetStakingPublickey())
if err != nil {
time.Sleep(time.Minute)
continue
}

if string(minerInfo.State) != "positive" {
continue
}

err = n.inspector()
if err != nil {
n.Restore("err", err.Error())
time.Sleep(pattern.BlockInterval)
Expand All @@ -43,7 +54,6 @@ func (n *Node) restoreMgt(ch chan bool) {
n.Restore("err", err.Error())
time.Sleep(pattern.BlockInterval)
}
time.Sleep(time.Minute)
}
time.Sleep(pattern.BlockInterval)
}
Expand Down Expand Up @@ -209,12 +219,13 @@ func (n *Node) claimRestoreOrder() error {
var err error
val, _ := n.QueryPrefixKeyList(Cach_prefix_recovery)
for _, v := range val {
_, err = n.QueryRestoralOrder(v)
restoreOrder, err := n.QueryRestoralOrder(v)
if err != nil {
if err.Error() == pattern.ERR_Empty {
n.Delete([]byte(Cach_prefix_recovery + v))
continue
}
continue
}

b, err := n.Get([]byte(Cach_prefix_recovery + v))
Expand All @@ -228,6 +239,11 @@ func (n *Node) claimRestoreOrder() error {
n.Restore("err", fmt.Sprintf("[restoreAFragment %s-%s] %v", string(b), v, err))
continue
}

if !sutils.CompareSlice(restoreOrder.Miner[:], n.GetStakingPublickey()) {
continue
}

txhash, err := n.RestoralComplete(v)
if err != nil {
n.Restore("err", fmt.Sprintf("[RestoralComplete %s-%s] %v", string(b), v, err))
Expand Down Expand Up @@ -265,7 +281,6 @@ func (n *Node) claimRestoreOrder() error {

func (n *Node) restoreAFragment(roothash, framentHash, recoveryPath string) error {
var err error
var id peer.ID
var miner pattern.MinerInfo
n.Restore("info", fmt.Sprintf("[%s] To restore the fragment: %s", roothash, framentHash))
n.Restore("info", fmt.Sprintf("[%s] Restore path: %s", roothash, recoveryPath))
Expand Down Expand Up @@ -324,24 +339,29 @@ func (n *Node) restoreAFragment(roothash, framentHash, recoveryPath string) erro
}
continue
}
minerAcc, _ := sutils.EncodePublicKeyAsCessAccount(v.Miner[:])
miner, err = n.QueryStorageMiner(v.Miner[:])
if err != nil {
n.Restore("err", fmt.Sprintf("[QueryStorageMiner %s]: %v", minerAcc, err))
continue
}
id, err = peer.Decode(base58.Encode([]byte(string(miner.PeerId[:]))))
if err != nil {
continue
}
addr, ok := n.GetPeer(id.Pretty())

peerid := base58.Encode([]byte(string(miner.PeerId[:])))

addr, ok := n.GetPeer(peerid)
if !ok {
n.Restore("err", fmt.Sprintf("Not found miner: %s, %s", minerAcc, peerid))
continue
}

err = n.Connect(n.GetCtxQueryFromCtxCancel(), addr)
if err != nil {
n.Restore("err", fmt.Sprintf("Connect to miner failed: %s, %s, err: %v", minerAcc, peerid, err))
continue
}
n.Restore("info", fmt.Sprintf("[%s] will read file from %s: %s", id.Pretty(), roothash, string(v.Hash[:])))
err = n.ReadFileAction(id, roothash, string(v.Hash[:]), filepath.Join(n.GetDirs().FileDir, roothash, string(v.Hash[:])), pattern.FragmentSize)

n.Restore("info", fmt.Sprintf("[%s] will read file from %s: %s", peerid, roothash, string(v.Hash[:])))
err = n.ReadFileAction(addr.ID, roothash, string(v.Hash[:]), filepath.Join(n.GetDirs().FileDir, roothash, string(v.Hash[:])), pattern.FragmentSize)
if err != nil {
os.Remove(filepath.Join(n.GetDirs().FileDir, roothash, string(v.Hash[:])))
n.Restore("err", fmt.Sprintf("[ReadFileAction] %v", err))
Expand Down
Loading