Skip to content

Commit

Permalink
Revert "Issue 1874 - Part1 Reduce calls to the Agbot Database"
Browse files Browse the repository at this point in the history
This reverts commit e74a812.
  • Loading branch information
Dave Booz committed Jul 10, 2020
1 parent d524162 commit 636aefb
Show file tree
Hide file tree
Showing 24 changed files with 597 additions and 1,181 deletions.
122 changes: 73 additions & 49 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (w *AgreementBotWorker) Initialize() bool {
// to initiate the protocol.
for protocolName, _ := range w.pm.GetAllAgreementProtocols() {
if policy.SupportedAgreementProtocol(protocolName) {
cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.retryAgreements)
cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM)
cph.Initialize()
w.consumerPH.Add(protocolName, cph)
} else {
Expand Down Expand Up @@ -424,7 +424,7 @@ func (w *AgreementBotWorker) CommandHandler(command worker.Command) bool {
// Update the protocol handler map and make sure there are workers available if the policy has a new protocol in it.
if !w.consumerPH.Has(agp.Name) {
glog.V(3).Infof("AgreementBotWorker creating worker pool for new agreement protocol %v", agp.Name)
cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.retryAgreements)
cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM)
cph.Initialize()
w.consumerPH.Add(agp.Name, cph)
}
Expand Down Expand Up @@ -709,35 +709,20 @@ func (w *AgreementBotWorker) NoWorkHandler() {
// pending state.
glog.V(4).Infof("AgreementBotWorker Looking for pending agreements before shutting down.")

// Look at all agreements across all protocols.
agreementPendingFilter := func() persistence.AFilter {
return func(a persistence.Agreement) bool { return a.AgreementFinalizedTime == 0 && a.AgreementTimedout == 0 }
}

// Look at all agreements across all protocols,
foundPending := false
for _, agp := range policy.AllAgreementProtocols() {

// Find all agreements that are in progress, agreements that are not archived and dont have either a finalized time or a timeeout time.
nextAgreementId := ""
for {
lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetPendingFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit())
if err != nil {
glog.Errorf("AgreementBotWorker unable to read agreements from database, error: %v", err)
w.Messages() <- events.NewNodeShutdownCompleteMessage(events.AGBOT_QUIESCE_COMPLETE, err.Error())
break
}

// There are pending agreements, so exit the scan loop.
if len(agreements) != 0 {
foundPending = true
break
}

// If there are no more agreements to iterate, then break out of the loop.
if lastAgreementId == "" {
break
} else {
nextAgreementId = lastAgreementId
}
}

if foundPending {
if agreements, err := w.db.FindAgreements([]persistence.AFilter{agreementPendingFilter(), persistence.UnarchivedAFilter()}, agp); err != nil {
glog.Errorf("AgreementBotWorker unable to read agreements from database, error: %v", err)
w.Messages() <- events.NewNodeShutdownCompleteMessage(events.AGBOT_QUIESCE_COMPLETE, err.Error())
} else if len(agreements) != 0 {
foundPending = true
break
}

Expand Down Expand Up @@ -838,6 +823,26 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy
return err
} else {

// Get all the agreements for this policy that are still active.
pendingAgreementFilter := func() persistence.AFilter {
return func(a persistence.Agreement) bool {
return a.PolicyName == consumerPolicy.Header.Name && a.AgreementTimedout == 0
}
}

ags := make(map[string][]persistence.Agreement)

// The agreements with this policy could be part of any supported agreement protocol.
for _, agp := range policy.AllAgreementProtocols() {
// Find all agreements that are in progress. They might be waiting for a reply or not yet finalized.
// TODO: To support more than 1 agreement (maxagreements > 1) with this device for this policy, we need to adjust this logic.
if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), pendingAgreementFilter()}, agp); err != nil {
glog.Errorf("AgreementBotWorker received error trying to find pending agreements for protocol %v: %v", agp, err)
} else {
ags[agp] = agreements
}
}

for _, dev := range *devices {

if filter != nil && filter(dev.Id) {
Expand All @@ -847,6 +852,12 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy
glog.V(3).Infof("AgreementBotWorker picked up %v for policy %v.", dev.ShortString(), consumerPolicy.Header.Name)
glog.V(5).Infof("AgreementBotWorker picked up %v", dev)

// Check for agreements already in progress with this device
if found := w.alreadyMakingAgreementWith(&dev, consumerPolicy, ags); found {
glog.V(5).Infof("AgreementBotWorker skipping device id %v, agreement attempt already in progress with %v", dev.Id, consumerPolicy.Header.Name)
continue
}

// If the device is not ready to make agreements yet, then skip it.
if dev.PublicKey == "" {
glog.V(5).Infof("AgreementBotWorker skipping device id %v, node is not ready to exchange messages", dev.Id)
Expand Down Expand Up @@ -893,6 +904,28 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy

}

// Check all agreement protocol buckets to see if there are any agreements with this device.
// Return true if there is already an agreement for this node and policy.
func (w *AgreementBotWorker) alreadyMakingAgreementWith(dev *exchange.SearchResultDevice, consumerPolicy *policy.Policy, allAgreements map[string][]persistence.Agreement) bool {

// Check to see if we're already doing something with this device.
for _, ags := range allAgreements {
// Look for any agreements with the current node.
for _, ag := range ags {
if ag.DeviceId == dev.Id {
if ag.AgreementFinalizedTime != 0 {
glog.V(5).Infof("AgreementBotWorker sending agreement verify for %v", ag.CurrentAgreementId)
w.consumerPH.Get(ag.AgreementProtocol).VerifyAgreement(&ag, w.consumerPH.Get(ag.AgreementProtocol))
w.retryAgreements.AddRetry(consumerPolicy.Header.Name, dev.Id)
}
return true
}
}
}
return false

}

func (w *AgreementBotWorker) policyWatcher(name string, quit chan bool) {

worker.GetWorkerStatusManager().SetSubworkerStatus(w.GetName(), name, worker.STATUS_STARTED)
Expand Down Expand Up @@ -1125,22 +1158,10 @@ func (w *AgreementBotWorker) syncOnInit() error {
// Search all agreement protocol buckets
for _, agp := range policy.AllAgreementProtocols() {

neededBCInstances := make(map[string]map[string]map[string]bool)

// Loop through our database and check each record for accuracy with the exchange and the blockchain
nextAgreementId := ""
for {
lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetUnarchivedFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit())
if err != nil {
return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err)))
}
if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil {

// If there are no more agreements to iterate, then break out of the loop.
if lastAgreementId == "" {
break
} else {
nextAgreementId = lastAgreementId
}
neededBCInstances := make(map[string]map[string]map[string]bool)

for _, ag := range agreements {

Expand Down Expand Up @@ -1223,17 +1244,20 @@ func (w *AgreementBotWorker) syncOnInit() error {
}

}
}

// Fire off start requests for each BC client that we need running. The blockchain worker and the container worker will tolerate
// a start request for containers that are already running.
glog.V(3).Infof(AWlogString(fmt.Sprintf("discovered BC instances in DB %v", neededBCInstances)))
for org, typeMap := range neededBCInstances {
for typeName, instMap := range typeMap {
for instName, _ := range instMap {
w.Messages() <- events.NewNewBCContainerMessage(events.NEW_BC_CLIENT, typeName, instName, org, w.GetExchangeURL(), w.GetExchangeId(), w.GetExchangeToken())
// Fire off start requests for each BC client that we need running. The blockchain worker and the container worker will tolerate
// a start request for containers that are already running.
glog.V(3).Infof(AWlogString(fmt.Sprintf("discovered BC instances in DB %v", neededBCInstances)))
for org, typeMap := range neededBCInstances {
for typeName, instMap := range typeMap {
for instName, _ := range instMap {
w.Messages() <- events.NewNewBCContainerMessage(events.NEW_BC_CLIENT, typeName, instName, org, w.GetExchangeURL(), w.GetExchangeId(), w.GetExchangeToken())
}
}
}

} else {
return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err)))
}
}

Expand Down
61 changes: 13 additions & 48 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,15 @@ type HandleWorkloadUpgrade struct {
AgreementId string
Protocol string
Device string
Org string
PolicyName string
}

func NewHandleWorkloadUpgrade(agId string, protocol string, device string, org string, policyName string) AgreementWork {
func NewHandleWorkloadUpgrade(agId string, protocol string, device string, policyName string) AgreementWork {
return HandleWorkloadUpgrade{
workType: WORKLOAD_UPGRADE,
AgreementId: agId,
Device: device,
Protocol: protocol,
Org: org,
PolicyName: policyName,
}
}
Expand Down Expand Up @@ -224,15 +222,14 @@ type AgreementWorker interface {
}

type BaseAgreementWorker struct {
pm *policy.PolicyManager
db persistence.AgbotDatabase
config *config.HorizonConfig
alm *AgreementLockManager
workerID string
httpClient *http.Client
ec *worker.BaseExchangeContext
mmsObjMgr *MMSObjectPolicyManager
retryAgreements *RetryAgreements
pm *policy.PolicyManager
db persistence.AgbotDatabase
config *config.HorizonConfig
alm *AgreementLockManager
workerID string
httpClient *http.Client
ec *worker.BaseExchangeContext
mmsObjMgr *MMSObjectPolicyManager
}

// A local implementation of the ExchangeContext interface because Agbot agreement workers are not full featured workers.
Expand Down Expand Up @@ -280,40 +277,8 @@ func (b *BaseAgreementWorker) AgreementLockManager() *AgreementLockManager {
return b.alm
}

// Return true if there is already an agreement for this node and policy.
func (b *BaseAgreementWorker) alreadyMakingAgreementWith(workerId string, cph ConsumerProtocolHandler, wi *InitiateAgreement) bool {

// If there is at least 1 agreement that matches the node and policy, that's enough to skip the node.
_, ags, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetNodeFilter(wi.Device.Id), b.db.GetPolicyFilter(wi.Org, wi.ConsumerPolicy.Header.Name), b.db.GetActiveFilter()}, cph.Name(), "", 1)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error searching for existing agreements for %v, %v, error: %v", wi.Device.Id, wi.ConsumerPolicy.Header.Name, err)))
return false
}

// If an in-progress agreement was returned from the query, check to make sure it is not finalized. If it's finalized, then
// the exchange should not have returned the node in the search results. The agbot might be out of sync, so send an
// agreement verify request to the node to find out if the agreement is valid or not.
if ags != nil && len(ags) > 0 {
if ags[0].AgreementFinalizedTime != 0 {
glog.V(5).Infof(BAWlogstring(workerId, fmt.Sprintf("sending agreement verify for %v", ags[0].CurrentAgreementId)))
cph.VerifyAgreement(&ags[0], cph)
b.retryAgreements.AddRetry(wi.ConsumerPolicy.Header.Name, wi.Device.Id)
}
return true
}

return false
}

func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler, wi *InitiateAgreement, random *rand.Rand, workerId string) {

// Check the database to see if the node already has an agreement with the policy. This check is done here on the protocol worker thread
// in order to offload the DB call from the main agbot thread.
if found := b.alreadyMakingAgreementWith(workerId, cph, wi); found {
glog.Infof(BAWlogstring(workerId, fmt.Sprintf("skipping device id %v, agreement attempt already in progress with %v", wi.Device.Id, wi.ConsumerPolicy.Header.Name)))
return
}

// Generate an agreement ID
agreementIdString, aerr := cutil.GenerateAgreementId()
if aerr != nil {
Expand Down Expand Up @@ -753,7 +718,7 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,

// Find the saved agreement in the database. The returned agreement might be archived. If it's archived, then it is our agreement
// so we will delete the protocol msg.
if agreement, err := b.db.FindSingleAgreementByAgreementId(reply.AgreementId(), cph.Name(), []persistence.AgbotDBFilter{}); err != nil {
if agreement, err := b.db.FindSingleAgreementByAgreementId(reply.AgreementId(), cph.Name(), []persistence.AFilter{}); err != nil {
// A DB error occurred so we dont know if this is our agreement or not. Leave it alone until the agbot is restarted
// or until the DB error is resolved.
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying pending agreement %v, error: %v", reply.AgreementId(), err)))
Expand Down Expand Up @@ -950,7 +915,7 @@ func (b *BaseAgreementWorker) HandleDataReceivedAck(cph ConsumerProtocolHandler,

// The agreement might be archived in this agbot's partition. If it's archived, then it is our agreement
// so we will delete the protocol msg, but we will ignore the ack msg.
if ag, err := b.db.FindSingleAgreementByAgreementId(drAck.AgreementId(), cph.Name(), []persistence.AgbotDBFilter{}); err != nil {
if ag, err := b.db.FindSingleAgreementByAgreementId(drAck.AgreementId(), cph.Name(), []persistence.AFilter{}); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying agreement %v, error: %v", drAck.AgreementId(), err)))
deleteMessage = false
} else if ag != nil && ag.Archived {
Expand Down Expand Up @@ -989,7 +954,7 @@ func (b *BaseAgreementWorker) HandleWorkloadUpgrade(cph ConsumerProtocolHandler,
// grab the agreement id lock, cancel the agreement and delete the workload usage record.

if wi.AgreementId == "" {
if _, ags, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetNodeFilter(wi.Device), b.db.GetPolicyFilter(wi.Org, wi.PolicyName)}, cph.Name(), "", 999); err != nil {
if ags, err := b.db.FindAgreements([]persistence.AFilter{persistence.DevPolAFilter(wi.Device, wi.PolicyName)}, cph.Name()); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error finding agreement for device %v and policyName %v, error: %v", wi.Device, wi.PolicyName, err)))
} else if len(ags) == 0 {
// If there is no agreement found, is it a problem? We could have caught the system in a state where there is no
Expand Down Expand Up @@ -1117,7 +1082,7 @@ func (b *BaseAgreementWorker) ExternalCancel(cph ConsumerProtocolHandler, agreem
glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("starting deferred cancel for %v", agreementId)))

// Find the agreement record
if ag, err := b.db.FindSingleAgreementByAgreementId(agreementId, cph.Name(), []persistence.AgbotDBFilter{}); err != nil {
if ag, err := b.db.FindSingleAgreementByAgreementId(agreementId, cph.Name(), []persistence.AFilter{}); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying agreement %v from database, error: %v", agreementId, err)))
} else if ag == nil {
glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("nothing to terminate for agreement %v, no database record.", agreementId)))
Expand Down
29 changes: 9 additions & 20 deletions agreementbot/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) {
id := pathVars["id"]

if id != "" {
if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{}); err != nil {
if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AFilter{}); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", id, err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
} else if ag == nil {
Expand All @@ -228,22 +228,11 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) {
wrap[agreementsKey][activeKey] = []persistence.Agreement{}

for _, agp := range policy.AllAgreementProtocols() {

nextAgreementId := ""
for {
lastAgreementId, ags, err := a.db.FindAgreementsPage([]persistence.AgbotDBFilter{}, agp, nextAgreementId, a.Config.GetAgbotDBLimit())
if err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding all agreements, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

// If there are no more agreements to iterate, then break out of the loop.
if lastAgreementId == "" {
break
} else {
nextAgreementId = lastAgreementId
}
if ags, err := a.db.FindAgreements([]persistence.AFilter{}, agp); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding all agreements, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
} else {

for _, agreement := range ags {
// The archived agreements and the agreements being terminated are returned as archived.
Expand Down Expand Up @@ -275,7 +264,7 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) {
}
glog.V(3).Infof(APIlogString(fmt.Sprintf("handling DELETE of agreement: %v", r)))

if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{a.db.GetUnarchivedFilter()}); err != nil {
if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AFilter{persistence.UnarchivedAFilter()}); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", id, err)))
w.WriteHeader(http.StatusInternalServerError)
} else if ag == nil {
Expand Down Expand Up @@ -399,7 +388,7 @@ func (a *API) policy(w http.ResponseWriter, r *http.Request) {
protocol := ""
// The body is syntacticly correct, verify that the agreement id matches up with the device id and policy name.
if upgrade.AgreementId != "" {
if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(upgrade.AgreementId, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{a.db.GetUnarchivedFilter()}); err != nil {
if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(upgrade.AgreementId, policy.AllAgreementProtocols(), []persistence.AFilter{persistence.UnarchivedAFilter()}); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", upgrade.AgreementId, err)))
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down Expand Up @@ -438,7 +427,7 @@ func (a *API) policy(w http.ResponseWriter, r *http.Request) {
}

// If we got this far, begin workload upgrade processing.
a.Messages() <- events.NewABApiWorkloadUpgradeMessage(events.WORKLOAD_UPGRADE, protocol, upgrade.AgreementId, upgrade.Device, upgrade.Org, policyName)
a.Messages() <- events.NewABApiWorkloadUpgradeMessage(events.WORKLOAD_UPGRADE, protocol, upgrade.AgreementId, upgrade.Device, policyName)
w.WriteHeader(http.StatusOK)

case "OPTIONS":
Expand Down
Loading

0 comments on commit 636aefb

Please sign in to comment.