diff --git a/storage/config.go b/storage/config.go index 2977308f..5ee3539d 100644 --- a/storage/config.go +++ b/storage/config.go @@ -19,6 +19,12 @@ type Config struct { // Type of store // If blank, uses the default in-memory store Type Type `yaml:"type"` + + // Caching is whether to enable caching. + // This is used to drastically decrease read latency by pre-emptively caching writes + // as they happen, also known as the write-through caching strategy. + // Does not apply if Config.Type is not TypePostgres or TypeSQLite. + Caching bool `yaml:"caching,omitempty"` } // ValidateAndSetDefaults validates the configuration and sets the default values (if applicable) diff --git a/storage/store/sql/sql.go b/storage/store/sql/sql.go index e3d47009..152e42c5 100644 --- a/storage/store/sql/sql.go +++ b/storage/store/sql/sql.go @@ -13,6 +13,7 @@ import ( "github.com/TwiN/gatus/v4/storage/store/common" "github.com/TwiN/gatus/v4/storage/store/common/paging" "github.com/TwiN/gatus/v4/util" + "github.com/TwiN/gocache/v2" _ "github.com/lib/pq" _ "modernc.org/sqlite" ) @@ -32,6 +33,8 @@ const ( resultsCleanUpThreshold = common.MaximumNumberOfResults + 10 // Maximum number of results before triggering a clean up uptimeRetention = 7 * 24 * time.Hour + + cacheTTL = 10 * time.Minute ) var ( @@ -49,10 +52,14 @@ type Store struct { driver, path string db *sql.DB + + // writeThroughCache is a cache used to drastically decrease read latency by pre-emptively + // caching writes as they happen. If nil, writes are not cached. + writeThroughCache *gocache.Cache } // NewStore initializes the database and creates the schema if it doesn't already exist in the path specified -func NewStore(driver, path string) (*Store, error) { +func NewStore(driver, path string, caching bool) (*Store, error) { if len(driver) == 0 { return nil, ErrDatabaseDriverNotSpecified } @@ -79,6 +86,9 @@ func NewStore(driver, path string) (*Store, error) { _ = store.db.Close() return nil, err } + if caching { + store.writeThroughCache = gocache.NewCache().WithMaxSize(10000) + } return store, nil } @@ -323,6 +333,20 @@ func (s *Store) Insert(endpoint *core.Endpoint, result *core.Result) error { } } } + // TODO: add field to automatically refresh the cache when a new result is inserted + if s.writeThroughCache != nil { + cacheKeysToRefresh := s.writeThroughCache.GetKeysByPattern(endpoint.Key()+"*", 0) + for _, cacheKey := range cacheKeysToRefresh { + s.writeThroughCache.Delete(cacheKey) + endpointKey, params, err := extractKeyAndParamsFromCacheKey(cacheKey) + if err != nil { + log.Printf("[sql][Insert] Silently deleting cache key %s instead of refreshing due to error: %s", cacheKey, err.Error()) + continue + } + // Retrieve the endpoint status by key, which will in turn refresh the cache + _, _ = s.getEndpointStatusByKey(tx, endpointKey, params) + } + } if err = tx.Commit(); err != nil { _ = tx.Rollback() } @@ -350,6 +374,11 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int { log.Printf("[sql][DeleteAllEndpointStatusesNotInKeys] Failed to delete rows that do not belong to any of keys=%v: %s", keys, err.Error()) return 0 } + if s.writeThroughCache != nil { + // It's easier to just wipe out the entire cache than to try to find all keys that are not in the keys list + _ = s.writeThroughCache.DeleteKeysByPattern("*") + } + // Return number of rows deleted rowsAffects, _ := result.RowsAffected() return int(rowsAffects) } @@ -357,6 +386,9 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int { // Clear deletes everything from the store func (s *Store) Clear() { _, _ = s.db.Exec("DELETE FROM endpoints") + if s.writeThroughCache != nil { + _ = s.writeThroughCache.DeleteKeysByPattern("*") + } } // Save does nothing, because this store is immediately persistent. @@ -367,6 +399,10 @@ func (s *Store) Save() error { // Close the database handle func (s *Store) Close() { _ = s.db.Close() + if s.writeThroughCache != nil { + // Clear the cache too. If the store's been closed, we don't want to keep the cache around. + _ = s.writeThroughCache.DeleteKeysByPattern("*") + } } // insertEndpoint inserts an endpoint in the store and returns the generated id of said endpoint @@ -479,6 +515,15 @@ func (s *Store) getAllEndpointKeys(tx *sql.Tx) (keys []string, err error) { } func (s *Store) getEndpointStatusByKey(tx *sql.Tx, key string, parameters *paging.EndpointStatusParams) (*core.EndpointStatus, error) { + var cacheKey string + if s.writeThroughCache != nil { + cacheKey = generateCacheKey(key, parameters) + if cachedEndpointStatus, exists := s.writeThroughCache.Get(cacheKey); exists { + if castedCachedEndpointStatus, ok := cachedEndpointStatus.(*core.EndpointStatus); ok { + return castedCachedEndpointStatus, nil + } + } + } endpointID, group, endpointName, err := s.getEndpointIDGroupAndNameByKey(tx, key) if err != nil { return nil, err @@ -494,6 +539,9 @@ func (s *Store) getEndpointStatusByKey(tx *sql.Tx, key string, parameters *pagin log.Printf("[sql][getEndpointStatusByKey] Failed to retrieve results for key=%s: %s", key, err.Error()) } } + if s.writeThroughCache != nil { + s.writeThroughCache.SetWithTTL(cacheKey, endpointStatus, cacheTTL) + } return endpointStatus, nil } @@ -788,3 +836,29 @@ func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, endpointID int64, maxAge time _, err := tx.Exec("DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2", endpointID, maxAge.Unix()) return err } + +func generateCacheKey(endpointKey string, p *paging.EndpointStatusParams) string { + return fmt.Sprintf("%s-%d-%d-%d-%d", endpointKey, p.EventsPage, p.EventsPageSize, p.ResultsPage, p.ResultsPageSize) +} + +func extractKeyAndParamsFromCacheKey(cacheKey string) (string, *paging.EndpointStatusParams, error) { + parts := strings.Split(cacheKey, "-") + if len(parts) < 5 { + return "", nil, fmt.Errorf("invalid cache key: %s", cacheKey) + } + params := &paging.EndpointStatusParams{} + var err error + if params.EventsPage, err = strconv.Atoi(parts[len(parts)-4]); err != nil { + return "", nil, fmt.Errorf("invalid cache key: %s", err.Error()) + } + if params.EventsPageSize, err = strconv.Atoi(parts[len(parts)-3]); err != nil { + return "", nil, fmt.Errorf("invalid cache key: %s", err.Error()) + } + if params.ResultsPage, err = strconv.Atoi(parts[len(parts)-2]); err != nil { + return "", nil, fmt.Errorf("invalid cache key: %s", err.Error()) + } + if params.ResultsPageSize, err = strconv.Atoi(parts[len(parts)-1]); err != nil { + return "", nil, fmt.Errorf("invalid cache key: %s", err.Error()) + } + return strings.Join(parts[:len(parts)-4], "-"), params, nil +} diff --git a/storage/store/sql/sql_test.go b/storage/store/sql/sql_test.go index 02568f3c..cb4f4371 100644 --- a/storage/store/sql/sql_test.go +++ b/storage/store/sql/sql_test.go @@ -81,13 +81,13 @@ var ( ) func TestNewStore(t *testing.T) { - if _, err := NewStore("", "TestNewStore.db"); err != ErrDatabaseDriverNotSpecified { + if _, err := NewStore("", "TestNewStore.db", false); err != ErrDatabaseDriverNotSpecified { t.Error("expected error due to blank driver parameter") } - if _, err := NewStore("sqlite", ""); err != ErrPathNotSpecified { + if _, err := NewStore("sqlite", "", false); err != ErrPathNotSpecified { t.Error("expected error due to blank path parameter") } - if store, err := NewStore("sqlite", t.TempDir()+"/TestNewStore.db"); err != nil { + if store, err := NewStore("sqlite", t.TempDir()+"/TestNewStore.db", false); err != nil { t.Error("shouldn't have returned any error, got", err.Error()) } else { _ = store.db.Close() @@ -95,7 +95,7 @@ func TestNewStore(t *testing.T) { } func TestStore_InsertCleansUpOldUptimeEntriesProperly(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertCleansUpOldUptimeEntriesProperly.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertCleansUpOldUptimeEntriesProperly.db", false) defer store.Close() now := time.Now().Round(time.Minute) now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -152,7 +152,7 @@ func TestStore_InsertCleansUpOldUptimeEntriesProperly(t *testing.T) { } func TestStore_InsertCleansUpEventsAndResultsProperly(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertCleansUpEventsAndResultsProperly.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertCleansUpEventsAndResultsProperly.db", false) defer store.Close() for i := 0; i < resultsCleanUpThreshold+eventsCleanUpThreshold; i++ { store.Insert(&testEndpoint, &testSuccessfulResult) @@ -170,7 +170,7 @@ func TestStore_InsertCleansUpEventsAndResultsProperly(t *testing.T) { func TestStore_Persistence(t *testing.T) { path := t.TempDir() + "/TestStore_Persistence.db" - store, _ := NewStore("sqlite", path) + store, _ := NewStore("sqlite", path, false) store.Insert(&testEndpoint, &testSuccessfulResult) store.Insert(&testEndpoint, &testUnsuccessfulResult) if uptime, _ := store.GetUptimeByKey(testEndpoint.Key(), time.Now().Add(-time.Hour), time.Now()); uptime != 0.5 { @@ -188,7 +188,7 @@ func TestStore_Persistence(t *testing.T) { t.Fatal("sanity check failed") } store.Close() - store, _ = NewStore("sqlite", path) + store, _ = NewStore("sqlite", path, false) defer store.Close() ssFromNewStore, _ := store.GetEndpointStatus(testEndpoint.Group, testEndpoint.Name, paging.NewEndpointStatusParams().WithResults(1, common.MaximumNumberOfResults).WithEvents(1, common.MaximumNumberOfEvents)) if ssFromNewStore == nil || ssFromNewStore.Group != "group" || ssFromNewStore.Name != "name" || len(ssFromNewStore.Events) != 3 || len(ssFromNewStore.Results) != 2 { @@ -252,7 +252,7 @@ func TestStore_Persistence(t *testing.T) { } func TestStore_Save(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_Save.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_Save.db", false) defer store.Close() if store.Save() != nil { t.Error("Save shouldn't do anything for this store") @@ -262,7 +262,7 @@ func TestStore_Save(t *testing.T) { // Note that are much more extensive tests in /storage/store/store_test.go. // This test is simply an extra sanity check func TestStore_SanityCheck(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_SanityCheck.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_SanityCheck.db", false) defer store.Close() store.Insert(&testEndpoint, &testSuccessfulResult) endpointStatuses, _ := store.GetAllEndpointStatuses(paging.NewEndpointStatusParams()) @@ -306,7 +306,7 @@ func TestStore_SanityCheck(t *testing.T) { // TestStore_InvalidTransaction tests what happens if an invalid transaction is passed as parameter func TestStore_InvalidTransaction(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InvalidTransaction.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InvalidTransaction.db", false) defer store.Close() tx, _ := store.db.Begin() tx.Commit() @@ -364,7 +364,7 @@ func TestStore_InvalidTransaction(t *testing.T) { } func TestStore_NoRows(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_NoRows.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_NoRows.db", false) defer store.Close() tx, _ := store.db.Begin() defer tx.Rollback() @@ -378,7 +378,7 @@ func TestStore_NoRows(t *testing.T) { // This tests very unlikely cases where a table is deleted. func TestStore_BrokenSchema(t *testing.T) { - store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_BrokenSchema.db") + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_BrokenSchema.db", false) defer store.Close() if err := store.Insert(&testEndpoint, &testSuccessfulResult); err != nil { t.Fatal("expected no error, got", err.Error()) @@ -391,6 +391,7 @@ func TestStore_BrokenSchema(t *testing.T) { } // Break _, _ = store.db.Exec("DROP TABLE endpoints") + // And now we'll try to insert something in our broken schema if err := store.Insert(&testEndpoint, &testSuccessfulResult); err == nil { t.Fatal("expected an error") } @@ -477,3 +478,89 @@ func TestStore_BrokenSchema(t *testing.T) { t.Fatal("expected an error") } } + +func TestCacheKey(t *testing.T) { + scenarios := []struct { + endpointKey string + params paging.EndpointStatusParams + overrideCacheKey string + expectedCacheKey string + wantErr bool + }{ + { + endpointKey: "simple", + params: paging.EndpointStatusParams{EventsPage: 1, EventsPageSize: 2, ResultsPage: 3, ResultsPageSize: 4}, + expectedCacheKey: "simple-1-2-3-4", + wantErr: false, + }, + { + endpointKey: "with-hyphen", + params: paging.EndpointStatusParams{EventsPage: 0, EventsPageSize: 0, ResultsPage: 1, ResultsPageSize: 20}, + expectedCacheKey: "with-hyphen-0-0-1-20", + wantErr: false, + }, + { + endpointKey: "with-multiple-hyphens", + params: paging.EndpointStatusParams{EventsPage: 0, EventsPageSize: 0, ResultsPage: 2, ResultsPageSize: 20}, + expectedCacheKey: "with-multiple-hyphens-0-0-2-20", + wantErr: false, + }, + { + overrideCacheKey: "invalid-a-2-3-4", + wantErr: true, + }, + { + overrideCacheKey: "invalid-1-a-3-4", + wantErr: true, + }, + { + overrideCacheKey: "invalid-1-2-a-4", + wantErr: true, + }, + { + overrideCacheKey: "invalid-1-2-3-a", + wantErr: true, + }, + { + overrideCacheKey: "notenoughhyphen1-2-3-4", + wantErr: true, + }, + } + for _, scenario := range scenarios { + t.Run(scenario.expectedCacheKey+scenario.overrideCacheKey, func(t *testing.T) { + var cacheKey string + if len(scenario.overrideCacheKey) > 0 { + cacheKey = scenario.overrideCacheKey + } else { + cacheKey = generateCacheKey(scenario.endpointKey, &scenario.params) + if cacheKey != scenario.expectedCacheKey { + t.Errorf("expected %s, got %s", scenario.expectedCacheKey, cacheKey) + } + } + extractedEndpointKey, extractedParams, err := extractKeyAndParamsFromCacheKey(cacheKey) + if (err != nil) != scenario.wantErr { + t.Errorf("expected error %v, got %v", scenario.wantErr, err) + return + } + if err != nil { + // If there's an error, we don't need to check the extracted values + return + } + if extractedEndpointKey != scenario.endpointKey { + t.Errorf("expected endpointKey %s, got %s", scenario.endpointKey, extractedEndpointKey) + } + if extractedParams.EventsPage != scenario.params.EventsPage { + t.Errorf("expected EventsPage %d, got %d", scenario.params.EventsPage, extractedParams.EventsPage) + } + if extractedParams.EventsPageSize != scenario.params.EventsPageSize { + t.Errorf("expected EventsPageSize %d, got %d", scenario.params.EventsPageSize, extractedParams.EventsPageSize) + } + if extractedParams.ResultsPage != scenario.params.ResultsPage { + t.Errorf("expected ResultsPage %d, got %d", scenario.params.ResultsPage, extractedParams.ResultsPage) + } + if extractedParams.ResultsPageSize != scenario.params.ResultsPageSize { + t.Errorf("expected ResultsPageSize %d, got %d", scenario.params.ResultsPageSize, extractedParams.ResultsPageSize) + } + }) + } +} diff --git a/storage/store/store.go b/storage/store/store.go index 53ecb1b3..06e45436 100644 --- a/storage/store/store.go +++ b/storage/store/store.go @@ -12,7 +12,7 @@ import ( "github.com/TwiN/gatus/v4/storage/store/sql" ) -// Store is the interface that each stores should implement +// Store is the interface that each store should implement type Store interface { // GetAllEndpointStatuses returns the JSON encoding of all monitored core.EndpointStatus // with a subset of core.Result defined by the page and pageSize parameters @@ -103,7 +103,7 @@ func Initialize(cfg *storage.Config) error { ctx, cancelFunc = context.WithCancel(context.Background()) switch cfg.Type { case storage.TypeSQLite, storage.TypePostgres: - store, err = sql.NewStore(string(cfg.Type), cfg.Path) + store, err = sql.NewStore(string(cfg.Type), cfg.Path, cfg.Caching) if err != nil { return err } diff --git a/storage/store/store_bench_test.go b/storage/store/store_bench_test.go index 800b380c..15f0b3b8 100644 --- a/storage/store/store_bench_test.go +++ b/storage/store/store_bench_test.go @@ -16,7 +16,7 @@ func BenchmarkStore_GetAllEndpointStatuses(b *testing.B) { if err != nil { b.Fatal("failed to create store:", err.Error()) } - sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_GetAllEndpointStatuses.db") + sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_GetAllEndpointStatuses.db", false) if err != nil { b.Fatal("failed to create store:", err.Error()) } @@ -85,7 +85,7 @@ func BenchmarkStore_Insert(b *testing.B) { if err != nil { b.Fatal("failed to create store:", err.Error()) } - sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_Insert.db") + sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_Insert.db", false) if err != nil { b.Fatal("failed to create store:", err.Error()) } @@ -157,7 +157,7 @@ func BenchmarkStore_GetEndpointStatusByKey(b *testing.B) { if err != nil { b.Fatal("failed to create store:", err.Error()) } - sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_GetEndpointStatusByKey.db") + sqliteStore, err := sql.NewStore("sqlite", b.TempDir()+"/BenchmarkStore_GetEndpointStatusByKey.db", false) if err != nil { b.Fatal("failed to create store:", err.Error()) } diff --git a/storage/store/store_test.go b/storage/store/store_test.go index f06f2a69..b5bd6e0d 100644 --- a/storage/store/store_test.go +++ b/storage/store/store_test.go @@ -93,7 +93,11 @@ func initStoresAndBaseScenarios(t *testing.T, testName string) []*Scenario { if err != nil { t.Fatal("failed to create store:", err.Error()) } - sqliteStore, err := sql.NewStore("sqlite", t.TempDir()+"/"+testName+".db") + sqliteStore, err := sql.NewStore("sqlite", t.TempDir()+"/"+testName+".db", false) + if err != nil { + t.Fatal("failed to create store:", err.Error()) + } + sqliteStoreWithCaching, err := sql.NewStore("sqlite", t.TempDir()+"/"+testName+"-with-caching.db", true) if err != nil { t.Fatal("failed to create store:", err.Error()) } @@ -106,6 +110,10 @@ func initStoresAndBaseScenarios(t *testing.T, testName string) []*Scenario { Name: "sqlite", Store: sqliteStore, }, + { + Name: "sqlite-with-caching", + Store: sqliteStoreWithCaching, + }, } }