From d7a183668e014657f92a1fbe4a1e376c433ff0f7 Mon Sep 17 00:00:00 2001 From: Evangelos Skopelitis Date: Wed, 19 Jul 2023 13:21:24 -0400 Subject: [PATCH] backend: Add instance statistics table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A by-product of the Omaha update protocol employed by Nebraska delivers statistics on version spread, but accumulating and presenting this data suffers from a number of limitations: - The chart only represents all nodes’ current OS versions. We currently do not have historic data of the nodes’ previous software versions, and we would like to track of this data. - While historic data can be calculated from update events, this is computationally intense and unfeasible in practice. We currently cannot calculate or store this historic data efficiently, and we would like to be able to have quick and easy access to this data. - To be able to back-fill the live Nebraska server using historic data available in CSV format, we need to use a third-party tool. We currently cannot accomplish this using our existing setup, and we would like to implement a mechanism to do this natively. We implement the `instance_stats` table (based on instance fields from `instance_application`) to store data on current version spread in an efficient format optimized for querying. To accomplish this, there are five data points that we associate each instance with, serving as the main fields of this table: - Timestamp: Necessary for keeping track of when update checks are performed - Channel name: Necessary to identify an individual instance - Architecture: Necessary to identify an individual instance - Version: Necessary to identify an individual instance - Instance count: Necessary for grouping instances by the previous four fields New entries to the table will be generated using a background job scheduled periodically at a specified interval. --- backend/pkg/api/bindata.go | 23 +++ .../0019_add_instance_stats_table.sql | 14 ++ backend/pkg/api/instances.go | 163 ++++++++++++++++++ backend/pkg/api/instances_test.go | 102 +++++++++++ 4 files changed, 302 insertions(+) create mode 100644 backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql diff --git a/backend/pkg/api/bindata.go b/backend/pkg/api/bindata.go index 62763108a..25cad1fb2 100644 --- a/backend/pkg/api/bindata.go +++ b/backend/pkg/api/bindata.go @@ -20,6 +20,7 @@ // db/migrations/0016_add_version_breakdown_indexes.sql (734B) // db/migrations/0017_drop_unused_indexes.sql (297B) // db/migrations/0018_add_sha256_file_field.sql (173B) +// db/migrations/0019_add_instance_stats_table.sql (361B) package api @@ -487,6 +488,26 @@ func dbMigrations0018_add_sha256_file_fieldSql() (*asset, error) { return a, nil } +var _dbMigrations0019_add_instance_stats_tableSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x6c\x90\xcd\x4a\xc6\x30\x10\x45\xf7\xf3\x14\xb3\x4c\x30\x05\x11\xc4\x45\xd1\x95\xaf\xe0\xba\x8c\x71\xb4\xc1\x64\x52\x93\x69\x15\x9f\x5e\x5a\xe9\x8f\xf5\xdb\x0d\x9c\x7b\x93\xc3\x6d\x1a\xbc\x4a\xe1\xad\x90\x32\x3e\x0d\x00\xbe\xf0\x7c\x2a\x3d\x47\xc6\xf0\x8a\x92\x15\xf9\x2b\x54\xad\x18\xa4\x2a\x89\xe7\xae\x2a\x69\x45\x03\x88\x88\x1a\x12\x57\xa5\x34\xec\x97\x7e\x2f\x2d\x19\x63\x74\x4b\xc6\xf7\x24\xc2\xb1\x13\x4a\x8c\x13\x15\xdf\x53\x31\x37\xb7\xf6\x14\x9b\xc1\x86\xef\xce\x74\xe2\x52\x43\x96\x43\xff\xdf\x03\xab\xe0\xac\xaa\x1b\x43\xdf\xb3\x7f\x47\xb3\xd3\x87\x7b\xbc\xb6\xbf\x95\x51\xc2\xc7\xc8\x66\x73\x77\x7f\x64\xdd\xe2\xe4\xd6\xbf\x2d\xd8\x16\xe0\xb8\xd8\x63\xfe\x14\x80\x97\x92\x87\x7d\xb1\x8b\x6b\xb5\xf0\x13\x00\x00\xff\xff\xdc\x2b\xd0\xda\x69\x01\x00\x00") + +func dbMigrations0019_add_instance_stats_tableSqlBytes() ([]byte, error) { + return bindataRead( + _dbMigrations0019_add_instance_stats_tableSql, + "db/migrations/0019_add_instance_stats_table.sql", + ) +} + +func dbMigrations0019_add_instance_stats_tableSql() (*asset, error) { + bytes, err := dbMigrations0019_add_instance_stats_tableSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "db/migrations/0019_add_instance_stats_table.sql", size: 361, mode: os.FileMode(0644), modTime: time.Unix(1, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x58, 0x78, 0xb3, 0x28, 0xce, 0xbb, 0xe7, 0x2, 0x5e, 0xaa, 0xdc, 0x2a, 0x88, 0xe6, 0x60, 0x28, 0x95, 0x87, 0x4a, 0x2d, 0x6e, 0x73, 0x9c, 0x5c, 0x11, 0x23, 0xb0, 0xbb, 0x3e, 0x60, 0x4c, 0x76}} + return a, nil +} + // Asset loads and returns the asset for the given name. // It returns an error if the asset could not be found or // could not be loaded. @@ -598,6 +619,7 @@ var _bindata = map[string]func() (*asset, error){ "db/migrations/0016_add_version_breakdown_indexes.sql": dbMigrations0016_add_version_breakdown_indexesSql, "db/migrations/0017_drop_unused_indexes.sql": dbMigrations0017_drop_unused_indexesSql, "db/migrations/0018_add_sha256_file_field.sql": dbMigrations0018_add_sha256_file_fieldSql, + "db/migrations/0019_add_instance_stats_table.sql": dbMigrations0019_add_instance_stats_tableSql, } // AssetDebug is true if the assets were built with the debug flag enabled. @@ -667,6 +689,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "0016_add_version_breakdown_indexes.sql": {dbMigrations0016_add_version_breakdown_indexesSql, map[string]*bintree{}}, "0017_drop_unused_indexes.sql": {dbMigrations0017_drop_unused_indexesSql, map[string]*bintree{}}, "0018_add_sha256_file_field.sql": {dbMigrations0018_add_sha256_file_fieldSql, map[string]*bintree{}}, + "0019_add_instance_stats_table.sql": {dbMigrations0019_add_instance_stats_tableSql, map[string]*bintree{}}, }}, "sample_data.sql": {dbSample_dataSql, map[string]*bintree{}}, }}, diff --git a/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql b/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql new file mode 100644 index 000000000..85283d286 --- /dev/null +++ b/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql @@ -0,0 +1,14 @@ +-- +migrate Up + +create table if not exists instance_stats ( + timestamp timestamptz not null, + channel_name varchar(25) not null, + arch varchar(7) not null, + version varchar(255) not null, + instances int not null check (instances >= 0), + unique(timestamp, channel_name, arch, version) +); + +-- +migrate Down + +drop table if exists instance_stats; diff --git a/backend/pkg/api/instances.go b/backend/pkg/api/instances.go index 3480f0b7e..85d0e7d26 100644 --- a/backend/pkg/api/instances.go +++ b/backend/pkg/api/instances.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "strconv" + "strings" "time" "github.com/doug-martin/goqu/v9" @@ -48,6 +49,7 @@ const ( const ( validityInterval postgresDuration = "1 days" + defaultInterval time.Duration = 2 * time.Hour ) // Instance represents an instance running one or more applications for which @@ -108,6 +110,14 @@ type InstancesQueryParams struct { SearchValue string `json:"search_value"` } +type InstanceStats struct { + Timestamp time.Time `db:"timestamp" json:"timestamp"` + ChannelName string `db:"channel_name" json:"channel_name"` + Arch string `db:"arch" json:"arch"` + Version string `db:"version" json:"version"` + Instances int `db:"instances" json:"instances"` +} + type instanceFilterItem int const ( @@ -631,3 +641,156 @@ func (api *API) instanceStatusHistoryQuery(instanceID, appID, groupID string, li Order(goqu.C("created_ts").Desc()). Limit(uint(limit)) } + +// instanceStatsQuery returns a SelectDataset prepared to return all instances +// that have been checked in during a given duration from a given time. +func (api *API) instanceStatsQuery(t *time.Time, duration *time.Duration) *goqu.SelectDataset { + if t == nil { + now := time.Now() + t = &now + } + + if duration == nil { + d := defaultInterval + duration = &d + } + + // Helper function to convert duration to PostgreSQL interval string + durationToInterval := func(d time.Duration) string { + if d <= 0 { + d = time.Microsecond + } + + parts := []string{} + + hours := int(d.Hours()) + if hours != 0 { + parts = append(parts, fmt.Sprintf("%d hours", hours)) + } + + remainder := d - time.Duration(hours)*time.Hour + minutes := int(remainder.Minutes()) + if minutes != 0 { + parts = append(parts, fmt.Sprintf("%d minutes", minutes)) + } + + remainder -= time.Duration(minutes) * time.Minute + seconds := int(remainder.Seconds()) + if seconds != 0 { + parts = append(parts, fmt.Sprintf("%d seconds", seconds)) + } + + remainder -= time.Duration(seconds) * time.Second + microseconds := remainder.Microseconds() + if microseconds != 0 { + parts = append(parts, fmt.Sprintf("%d microseconds", microseconds)) + } + + return strings.Join(parts, " ") + } + + interval := durationToInterval(*duration) + timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00"))) + timestampMinusDuration := goqu.L("timestamp ? - interval ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")), interval) + + query := goqu.From(goqu.T("instance_application")). + Select( + timestamp, + goqu.T("channel").Col("name").As("channel_name"), + goqu.Case(). + When(goqu.T("channel").Col("arch").Eq(1), "AMD64"). + When(goqu.T("channel").Col("arch").Eq(2), "ARM"). + Else(""). + As("arch"), + goqu.C("version").As("version"), + goqu.COUNT("*").As("instances")). + Join(goqu.T("groups"), goqu.On(goqu.C("group_id").Eq(goqu.T("groups").Col("id")))). + Join(goqu.T("channel"), goqu.On(goqu.T("groups").Col("channel_id").Eq(goqu.T("channel").Col("id")))). + Where( + goqu.C("last_check_for_updates").Gt(timestampMinusDuration), + goqu.C("last_check_for_updates").Lte(timestamp)). + GroupBy(timestamp, + goqu.T("channel").Col("name"), + goqu.T("channel").Col("arch"), + goqu.C("version")). + Order(timestamp.Asc()) + + return query +} + +// GetInstanceStats returns an InstanceStats table with all instances that have +// been previously been checked in. +func (api *API) GetInstanceStats() ([]InstanceStats, error) { + query, _, err := goqu.From("instance_stats"). + Order(goqu.C("timestamp").Asc()).ToSQL() + if err != nil { + return nil, err + } + + rows, err := api.db.Queryx(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var instances []InstanceStats + for rows.Next() { + var instance InstanceStats + err = rows.StructScan(&instance) + if err != nil { + return nil, err + } + instances = append(instances, instance) + } + + return instances, nil +} + +// GetInstanceStatsByTimestamp returns an InstanceStats array of instances matching a +// given timestamp value, ordered by version. +func (api *API) GetInstanceStatsByTimestamp(t time.Time) ([]InstanceStats, error) { + timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00"))) + + query, _, err := goqu.From("instance_stats"). + Where(goqu.C("timestamp").Eq(timestamp)). + Order(goqu.C("version").Asc()).ToSQL() + if err != nil { + return nil, err + } + + rows, err := api.db.Queryx(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var instances []InstanceStats + for rows.Next() { + var instance InstanceStats + err = rows.StructScan(&instance) + if err != nil { + return nil, err + } + instances = append(instances, instance) + } + + return instances, nil +} + +// updateInstanceStats updates the instance_stats table with instances checked +// in during a given duration from a given time. +func (api *API) updateInstanceStats(t *time.Time, duration *time.Duration) error { + insertQuery, _, err := goqu.Insert(goqu.T("instance_stats")). + Cols("timestamp", "channel_name", "arch", "version", "instances"). + FromQuery(api.instanceStatsQuery(t, duration)). + ToSQL() + if err != nil { + return err + } + + _, err = api.db.Exec(insertQuery) + if err != nil { + return err + } + return nil +} diff --git a/backend/pkg/api/instances_test.go b/backend/pkg/api/instances_test.go index d40d85107..296dc4d5f 100644 --- a/backend/pkg/api/instances_test.go +++ b/backend/pkg/api/instances_test.go @@ -3,6 +3,7 @@ package api import ( "fmt" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -294,3 +295,104 @@ func TestGetInstanceStatusHistory(t *testing.T) { assert.Equal(t, history[3].Status, InstanceStatusUpdateGranted) assert.Equal(t, history[3].Version, "1.0.1") } + +func TestUpdateInstanceStats(t *testing.T) { + a := newForTest(t) + defer a.Close() + + instances, err := a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 0, len(instances)) + + // First test case: Create tInstance1, tInstance2, and tInstance3; check tInstance1 twice; switch tInstance2 version + start := time.Now().UTC() + + tTeam, _ := a.AddTeam(&Team{Name: "test_team"}) + tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID}) + tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID, Arch: ArchAMD64}) + tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID), Arch: ArchAMD64}) + tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"}) + tInstance1, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + tInstance2, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.2", "1.0.0", tApp.ID, tGroup.ID) + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.3", "1.0.1", tApp.ID, tGroup.ID) + + _, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.1", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + ts := time.Now().UTC() + elapsed := ts.Sub(start) + + err = a.updateInstanceStats(&ts, &elapsed) + assert.NoError(t, err) + + instances, err = a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 2, len(instances)) + + instanceStats, err := a.GetInstanceStatsByTimestamp(ts) + assert.NoError(t, err) + assert.Equal(t, 2, len(instanceStats)) + assert.Equal(t, "1.0.0", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) + assert.Equal(t, "1.0.1", instanceStats[1].Version) + assert.Equal(t, 2, instanceStats[1].Instances) + + // Next test case: Switch tInstance1 and tInstance2 versions to workaround the 5-minutes-rate-limiting of the check-in time and add new instance + ts2 := time.Now().UTC() + + _, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.3", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.4", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.4", "1.0.5", tApp.ID, tGroup.ID) + + ts3 := time.Now().UTC() + elapsed = ts3.Sub(ts2) + + err = a.updateInstanceStats(&ts3, &elapsed) + assert.NoError(t, err) + + instances, err = a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 5, len(instances)) + + instanceStats, err = a.GetInstanceStatsByTimestamp(ts3) + assert.NoError(t, err) + assert.Equal(t, 3, len(instanceStats)) + assert.Equal(t, "1.0.3", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) + assert.Equal(t, "1.0.4", instanceStats[1].Version) + assert.Equal(t, 1, instanceStats[1].Instances) + assert.Equal(t, "1.0.5", instanceStats[2].Version) + assert.Equal(t, 1, instanceStats[2].Instances) +} + +func TestUpdateInstanceStatsNoArch(t *testing.T) { + a := newForTest(t) + defer a.Close() + + tTeam, _ := a.AddTeam(&Team{Name: "test_team"}) + tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID}) + tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID}) + tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID)}) + tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"}) + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + + ts := time.Now().UTC() + // Use large duration to have some test coverage for durationToInterval + elapsed := 3*time.Hour + 45*time.Minute + 30*time.Second + 1000*time.Microsecond + + err := a.updateInstanceStats(&ts, &elapsed) + assert.NoError(t, err) + + instanceStats, err := a.GetInstanceStatsByTimestamp(ts) + assert.NoError(t, err) + assert.Equal(t, "", instanceStats[0].Arch) + assert.Equal(t, "1.0.0", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) +}