Skip to content

Commit

Permalink
added changes for increase throttling recovery speed
Browse files Browse the repository at this point in the history
  • Loading branch information
rajesh-1983 committed May 13, 2024
1 parent 53c76c1 commit 948a25f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 59 deletions.
16 changes: 13 additions & 3 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package lib

import (
"fmt"
"math"
"regexp"
"sync"
"sync/atomic"
Expand All @@ -31,6 +32,7 @@ type BindThrottle struct {
Name string
Value string
Sqlhash uint32
lowWorkerUsage float64
RecentAttempt atomic.Value // time.Time
AllowEveryX int
AllowEveryXCount int
Expand Down Expand Up @@ -144,15 +146,23 @@ func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string
// update based on usage
if heavyUsage {
entry.incrAllowEveryX()
entry.lowWorkerUsage -= 1
if entry.lowWorkerUsage < 0 {
entry.lowWorkerUsage = 0
}
} else {
entry.decrAllowEveryX(2)
entry.lowWorkerUsage += 1
}

// check if not used in a while
now := time.Now()
recent := entry.RecentAttempt.Load().(*time.Time)
gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec
entry.decrAllowEveryX(int(gap))
throttleReductionBase := now.Sub(*recent).Seconds()
if throttleReductionBase < 1 {
throttleReductionBase = 1
}
throttleReductionRate := throttleReductionBase * GetConfig().BindEvictionDecrPerSec * math.Exp(entry.lowWorkerUsage)
entry.decrAllowEveryX(int(throttleReductionRate))
if entry.AllowEveryX == 0 {
return false, nil
}
Expand Down
4 changes: 1 addition & 3 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package lib
import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -423,7 +421,7 @@ func InitConfig() error {
default_evict_names := fmt.Sprintf("id,num,%s", SrcPrefixAppKey)
gAppConfig.BindEvictionNames = cdb.GetOrDefaultString("bind_eviction_names", default_evict_names)
gAppConfig.BindEvictionThresholdPct = cdb.GetOrDefaultInt("bind_eviction_threshold_pct", 60)
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", strconv.Itoa(math.MaxInt32)), //Setting the value to MaxInt32 will help disable automatic bind throttle. If a specific OCC pool wants to enable automatic bind throttle, then it requires providing a value in hera.txt.
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10.0"),
"%f", &gAppConfig.BindEvictionDecrPerSec)

gAppConfig.SkipEvictRegex = cdb.GetOrDefaultString("skip_eviction_host_prefix", "")
Expand Down
66 changes: 37 additions & 29 deletions lib/querybindblocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ import (
"github.com/paypal/hera/utility/logger"
)


type QueryBindBlockerEntry struct {
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Bindvarvalue string // when set to "BLOCKALLVALUES" should block all sqltext queries
Blockperc int
Heramodule string
Blockperc int
Heramodule string
}

type QueryBindBlockerCfg struct {
Expand All @@ -48,7 +47,10 @@ type QueryBindBlockerCfg struct {
// check by sqltext prefix (delay to end)
}

func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool,string) {
var lastLoggingTime time.Time
var defaultQBBTableMissingErrorLoggingInterval = 2 * time.Hour

func (cfg *QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool, string) {
sqlhash := uint32(utility.GetSQLHash(sqltext))
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, fmt.Sprintf("query bind blocker sqlhash and text %d %s", sqlhash, sqltext))
Expand All @@ -70,7 +72,7 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
byBindValue, ok := byBindName[bindPairs[i]]
if !ok {
// strip numeric suffix to try to match
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i],"")
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i], "")
byBindValue, ok = byBindName[withoutNumSuffix]
if !ok {
continue
Expand Down Expand Up @@ -118,28 +120,27 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
var g_module string
var gQueryBindBlockerCfg atomic.Value

func GetQueryBindBlockerCfg() (*QueryBindBlockerCfg) {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
func GetQueryBindBlockerCfg() *QueryBindBlockerCfg {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
}


func InitQueryBindBlocker(modName string) {
g_module = modName

db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
logger.GetLogger().Log(logger.Alert, "Loading query bind blocker - conn err ", err)
return
}
db.SetMaxIdleConns(0)

return
}
db.SetMaxIdleConns(0)
go func() {
time.Sleep(4*time.Second)
time.Sleep(4 * time.Second)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker - initial")

loadBlockQueryBind(db)
c := time.Tick(11 * time.Second)
for now := range c {
Expand All @@ -152,11 +153,12 @@ func InitQueryBindBlocker(modName string) {
func loadBlockQueryBind(db *sql.DB) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
conn, err := db.Conn(ctx);
conn, err := db.Conn(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (conn) loading query bind blocker:", err)
return
}

defer conn.Close()
q := fmt.Sprintf("SELECT /*queryBindBlocker*/ %ssqlhash, %ssqltext, bindvarname, bindvarvalue, blockperc, %smodule FROM %s_rate_limiter where %smodule='%s'", GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().ManagementTablePrefix, GetConfig().StateLogPrefix, g_module)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker meta-sql "+q)
Expand All @@ -167,12 +169,18 @@ func loadBlockQueryBind(db *sql.DB) {
}
rows, err := stmt.QueryContext(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
return
if lastLoggingTime.IsZero() || time.Since(lastLoggingTime) > defaultQBBTableMissingErrorLoggingInterval {
//In case table missing log alert event for every 2 hour
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
lastLoggingTime = time.Now()
return
} else {
return
}
}
defer rows.Close()

cfgLoad := QueryBindBlockerCfg{BySqlHash:make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}
cfgLoad := QueryBindBlockerCfg{BySqlHash: make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}

rowCount := 0
for rows.Next() {
Expand All @@ -182,9 +190,9 @@ func loadBlockQueryBind(db *sql.DB) {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker:", err)
continue
}

if len(entry.Herasqltext) < GetConfig().QueryBindBlockerMinSqlPrefix {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix," bytes or more - sqlhash:", entry.Herasqlhash)
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix, " bytes or more - sqlhash:", entry.Herasqlhash)
continue
}
rowCount++
Expand All @@ -200,7 +208,7 @@ func loadBlockQueryBind(db *sql.DB) {
}
bindVal, ok := bindName[entry.Bindvarvalue]
if !ok {
bindVal = make([]QueryBindBlockerEntry,0)
bindVal = make([]QueryBindBlockerEntry, 0)
bindName[entry.Bindvarvalue] = bindVal
}
bindName[entry.Bindvarvalue] = append(bindName[entry.Bindvarvalue], entry)
Expand Down
53 changes: 29 additions & 24 deletions tests/unittest/bindThrottle/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMain(m *testing.M) {
}

func sleepyQ(conn *sql.Conn, delayRow int) error {
stmt, err := conn.PrepareContext(context.Background(), "select * from sleep_info where ( seconds > sleep_option(?) or seconds > 0.0 )")
stmt, err := conn.PrepareContext(context.Background(), fmt.Sprintf("select * from sleep_info where ( seconds > sleep_option(?) or seconds > 0.0 ) and id=%d", delayRow))
if err != nil {
fmt.Printf("Error preparing sleepyQ %s\n", err.Error())
return err
Expand Down Expand Up @@ -129,19 +129,18 @@ func partialBadLoad(fracBad float64) error {
fmt.Printf("spawning clients bad%d norm%d\n", numBad, numNorm)
mkClients(numBad, &stop2, 29001111, "badClient", &badCliErr, db)
mkClients(numNorm, &stop3, 100, "normClient", &cliErr, db) // bind value is short, so bindevict won't trigger
time.Sleep(3100 * time.Millisecond)
//time.Sleep(33100 * time.Millisecond)
time.Sleep(3000 * time.Millisecond)

// start normal clients after initial backlog timeouts
var stop int
var normCliErrStr string
mkClients(1, &stop, 21001111, "n client", &normCliErrStr, db)
time.Sleep(2000 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)

// if we throttle down or stop, it restores
stop2 = 1 // stop bad clients
stop3 = 1
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second) //Make sure that clear throttle
conn, err := db.Conn(context.Background())
if err != nil {
fmt.Printf("Error conn %s\n", err.Error())
Expand Down Expand Up @@ -174,31 +173,31 @@ func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db
nowStr := time.Now().Format("15:04:05.000000 ")
if conn == nil {
conn, err = db.Conn(context.Background())
fmt.Printf(grpName+" connected %d\n", clientId)
fmt.Printf("%s connected %d\n", grpName, clientId)
if err != nil {
fmt.Printf(nowStr+grpName+" Error %d conn %s\n", clientId, err.Error())
fmt.Printf("%s %s Error %d conn %s\n", nowStr, grpName, clientId, err.Error())
time.Sleep(7 * time.Millisecond)
continue
}
}

fmt.Printf(nowStr+grpName+"%d loop%d %s\n", clientId, count, time.Now().Format("20060102j150405.000000"))
fmt.Printf("%s %s %d loop%d %s\n", nowStr, grpName, clientId, count, time.Now().Format("20060102j150405.000000"))
err := sleepyQ(conn, bindV)
if err != nil {
if err.Error() == curErr {
fmt.Printf(nowStr+grpName+"%d same err twice\n", clientId)
fmt.Printf("%s %s %d same err twice\n", nowStr, grpName, clientId)
conn.Close()
conn = nil
} else {
curErr = err.Error()
*outErr = curErr
fmt.Printf(nowStr+grpName+"%d err %s\n", clientId, curErr)
fmt.Printf("%s %s %d err %s\n", nowStr, grpName, clientId, curErr)
}
}
count++
time.Sleep(10 * time.Millisecond)
}
fmt.Printf(time.Now().Format("15:04:05.000000 ")+grpName+"%d END loop%d\n", clientId, count)
fmt.Printf("%s %s %d END loop%d\n", time.Now().Format("15:04:05.000000 "), grpName, clientId, count)
}(i)
}
}
Expand All @@ -220,18 +219,24 @@ func TestBindThrottle(t *testing.T) {
t.Fatal("backlog timeout or saturation was not triggered")
} // */

if true {
logger.GetLogger().Log(logger.Debug, "BindThrottle midpt +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
err := partialBadLoad(0.8)
if err != nil {
// t.Fatalf("main step function returned err %s", err.Error()) // can be triggered since test only has one sql
}
if testutil.RegexCountFile("BIND_THROTTLE", "cal.log") > 0 {
t.Fatalf("BIND_THROTTLE should not trigger with high default BindEvictionDecrPerSec value 10000")
}
if testutil.RegexCountFile("BIND_EVICT", "cal.log") == 0 {
t.Fatalf("BIND_EVICT should trigger")
}
} // endif
logger.GetLogger().Log(logger.Debug, "BindThrottle midpt +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
err = partialBadLoad(0.8)
if err != nil {
// t.Fatalf("main step function returned err %s", err.Error()) // can be triggered since test only has one sql
}
if testutil.RegexCountFile("BIND_THROTTLE", "cal.log") < 0 {
t.Fatalf("BIND_THROTTLE should trigger")
}
if testutil.RegexCountFile("BIND_EVICT", "cal.log") == 0 {
t.Fatalf("BIND_EVICT should trigger")
}

if testutil.RegexCountFile(".*BIND_EVICT\t1354401077\t1.*", "cal.log") < 1 {
t.Fatalf("BIND_EVICT should trigger for SQL HASH 1354401077")
}

if testutil.RegexCountFile(".*BIND_THROTTLE\t1354401077\t1.*", "cal.log") < 1 {
t.Fatalf("BIND_THROTTLE should trigger for SQL HASH 1354401077")
}
logger.GetLogger().Log(logger.Debug, "BindThrottle done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
} // */

0 comments on commit 948a25f

Please sign in to comment.