Skip to content

Commit

Permalink
Merge pull request #656 from flatcar/skoeva/db_vs
Browse files Browse the repository at this point in the history
Add instance statistics table
  • Loading branch information
pothos committed Jul 19, 2023
2 parents fbb1fb9 + d7a1836 commit 25d24f1
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 0 deletions.
23 changes: 23 additions & 0 deletions backend/pkg/api/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +migrate Up

create table if not exists instance_stats (
timestamp timestamptz not null,
channel_name varchar(25) not null,
arch varchar(7) not null,
version varchar(255) not null,
instances int not null check (instances >= 0),
unique(timestamp, channel_name, arch, version)
);

-- +migrate Down

drop table if exists instance_stats;
163 changes: 163 additions & 0 deletions backend/pkg/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/doug-martin/goqu/v9"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (

const (
validityInterval postgresDuration = "1 days"
defaultInterval time.Duration = 2 * time.Hour
)

// Instance represents an instance running one or more applications for which
Expand Down Expand Up @@ -108,6 +110,14 @@ type InstancesQueryParams struct {
SearchValue string `json:"search_value"`
}

type InstanceStats struct {
Timestamp time.Time `db:"timestamp" json:"timestamp"`
ChannelName string `db:"channel_name" json:"channel_name"`
Arch string `db:"arch" json:"arch"`
Version string `db:"version" json:"version"`
Instances int `db:"instances" json:"instances"`
}

type instanceFilterItem int

const (
Expand Down Expand Up @@ -631,3 +641,156 @@ func (api *API) instanceStatusHistoryQuery(instanceID, appID, groupID string, li
Order(goqu.C("created_ts").Desc()).
Limit(uint(limit))
}

// instanceStatsQuery returns a SelectDataset prepared to return all instances
// that have been checked in during a given duration from a given time.
func (api *API) instanceStatsQuery(t *time.Time, duration *time.Duration) *goqu.SelectDataset {
if t == nil {
now := time.Now()
t = &now
}

if duration == nil {
d := defaultInterval
duration = &d
}

// Helper function to convert duration to PostgreSQL interval string
durationToInterval := func(d time.Duration) string {
if d <= 0 {
d = time.Microsecond
}

parts := []string{}

hours := int(d.Hours())
if hours != 0 {
parts = append(parts, fmt.Sprintf("%d hours", hours))
}

remainder := d - time.Duration(hours)*time.Hour
minutes := int(remainder.Minutes())
if minutes != 0 {
parts = append(parts, fmt.Sprintf("%d minutes", minutes))
}

remainder -= time.Duration(minutes) * time.Minute
seconds := int(remainder.Seconds())
if seconds != 0 {
parts = append(parts, fmt.Sprintf("%d seconds", seconds))
}

remainder -= time.Duration(seconds) * time.Second
microseconds := remainder.Microseconds()
if microseconds != 0 {
parts = append(parts, fmt.Sprintf("%d microseconds", microseconds))
}

return strings.Join(parts, " ")
}

interval := durationToInterval(*duration)
timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")))
timestampMinusDuration := goqu.L("timestamp ? - interval ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")), interval)

query := goqu.From(goqu.T("instance_application")).
Select(
timestamp,
goqu.T("channel").Col("name").As("channel_name"),
goqu.Case().
When(goqu.T("channel").Col("arch").Eq(1), "AMD64").
When(goqu.T("channel").Col("arch").Eq(2), "ARM").
Else("").
As("arch"),
goqu.C("version").As("version"),
goqu.COUNT("*").As("instances")).
Join(goqu.T("groups"), goqu.On(goqu.C("group_id").Eq(goqu.T("groups").Col("id")))).
Join(goqu.T("channel"), goqu.On(goqu.T("groups").Col("channel_id").Eq(goqu.T("channel").Col("id")))).
Where(
goqu.C("last_check_for_updates").Gt(timestampMinusDuration),
goqu.C("last_check_for_updates").Lte(timestamp)).
GroupBy(timestamp,
goqu.T("channel").Col("name"),
goqu.T("channel").Col("arch"),
goqu.C("version")).
Order(timestamp.Asc())

return query
}

// GetInstanceStats returns an InstanceStats table with all instances that have
// been previously been checked in.
func (api *API) GetInstanceStats() ([]InstanceStats, error) {
query, _, err := goqu.From("instance_stats").
Order(goqu.C("timestamp").Asc()).ToSQL()
if err != nil {
return nil, err
}

rows, err := api.db.Queryx(query)
if err != nil {
return nil, err
}
defer rows.Close()

var instances []InstanceStats
for rows.Next() {
var instance InstanceStats
err = rows.StructScan(&instance)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}

return instances, nil
}

// GetInstanceStatsByTimestamp returns an InstanceStats array of instances matching a
// given timestamp value, ordered by version.
func (api *API) GetInstanceStatsByTimestamp(t time.Time) ([]InstanceStats, error) {
timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")))

query, _, err := goqu.From("instance_stats").
Where(goqu.C("timestamp").Eq(timestamp)).
Order(goqu.C("version").Asc()).ToSQL()
if err != nil {
return nil, err
}

rows, err := api.db.Queryx(query)
if err != nil {
return nil, err
}
defer rows.Close()

var instances []InstanceStats
for rows.Next() {
var instance InstanceStats
err = rows.StructScan(&instance)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}

return instances, nil
}

// updateInstanceStats updates the instance_stats table with instances checked
// in during a given duration from a given time.
func (api *API) updateInstanceStats(t *time.Time, duration *time.Duration) error {
insertQuery, _, err := goqu.Insert(goqu.T("instance_stats")).
Cols("timestamp", "channel_name", "arch", "version", "instances").
FromQuery(api.instanceStatsQuery(t, duration)).
ToSQL()
if err != nil {
return err
}

_, err = api.db.Exec(insertQuery)
if err != nil {
return err
}
return nil
}
102 changes: 102 additions & 0 deletions backend/pkg/api/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -294,3 +295,104 @@ func TestGetInstanceStatusHistory(t *testing.T) {
assert.Equal(t, history[3].Status, InstanceStatusUpdateGranted)
assert.Equal(t, history[3].Version, "1.0.1")
}

func TestUpdateInstanceStats(t *testing.T) {
a := newForTest(t)
defer a.Close()

instances, err := a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 0, len(instances))

// First test case: Create tInstance1, tInstance2, and tInstance3; check tInstance1 twice; switch tInstance2 version
start := time.Now().UTC()

tTeam, _ := a.AddTeam(&Team{Name: "test_team"})
tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID})
tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID, Arch: ArchAMD64})
tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID), Arch: ArchAMD64})
tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"})
tInstance1, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)
tInstance2, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.2", "1.0.0", tApp.ID, tGroup.ID)
_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.3", "1.0.1", tApp.ID, tGroup.ID)

_, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.1", tApp.ID, tGroup.ID)
assert.NoError(t, err)

ts := time.Now().UTC()
elapsed := ts.Sub(start)

err = a.updateInstanceStats(&ts, &elapsed)
assert.NoError(t, err)

instances, err = a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 2, len(instances))

instanceStats, err := a.GetInstanceStatsByTimestamp(ts)
assert.NoError(t, err)
assert.Equal(t, 2, len(instanceStats))
assert.Equal(t, "1.0.0", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
assert.Equal(t, "1.0.1", instanceStats[1].Version)
assert.Equal(t, 2, instanceStats[1].Instances)

// Next test case: Switch tInstance1 and tInstance2 versions to workaround the 5-minutes-rate-limiting of the check-in time and add new instance
ts2 := time.Now().UTC()

_, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.3", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.4", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.4", "1.0.5", tApp.ID, tGroup.ID)

ts3 := time.Now().UTC()
elapsed = ts3.Sub(ts2)

err = a.updateInstanceStats(&ts3, &elapsed)
assert.NoError(t, err)

instances, err = a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 5, len(instances))

instanceStats, err = a.GetInstanceStatsByTimestamp(ts3)
assert.NoError(t, err)
assert.Equal(t, 3, len(instanceStats))
assert.Equal(t, "1.0.3", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
assert.Equal(t, "1.0.4", instanceStats[1].Version)
assert.Equal(t, 1, instanceStats[1].Instances)
assert.Equal(t, "1.0.5", instanceStats[2].Version)
assert.Equal(t, 1, instanceStats[2].Instances)
}

func TestUpdateInstanceStatsNoArch(t *testing.T) {
a := newForTest(t)
defer a.Close()

tTeam, _ := a.AddTeam(&Team{Name: "test_team"})
tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID})
tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID})
tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID)})
tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"})
_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)

ts := time.Now().UTC()
// Use large duration to have some test coverage for durationToInterval
elapsed := 3*time.Hour + 45*time.Minute + 30*time.Second + 1000*time.Microsecond

err := a.updateInstanceStats(&ts, &elapsed)
assert.NoError(t, err)

instanceStats, err := a.GetInstanceStatsByTimestamp(ts)
assert.NoError(t, err)
assert.Equal(t, "", instanceStats[0].Arch)
assert.Equal(t, "1.0.0", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
}

0 comments on commit 25d24f1

Please sign in to comment.