diff --git a/storage/store/database/database.go b/storage/store/database/database.go index 5bdda4ba..0f4dbed7 100644 --- a/storage/store/database/database.go +++ b/storage/store/database/database.go @@ -47,6 +47,13 @@ func NewStore(driver, path string) (*Store, error) { if store.db, err = sql.Open(driver, path); err != nil { return nil, err } + if driver == "sqlite" { + _, _ = store.db.Exec("PRAGMA foreign_keys=ON") + _, _ = store.db.Exec("PRAGMA journal_mode=WAL") + // Prevents driver from running into "database is locked" errors + // This is because we're using WAL to improve performance + store.db.SetMaxOpenConns(1) + } if err = store.createSchema(); err != nil { _ = store.db.Close() return nil, err @@ -71,7 +78,7 @@ func (s *Store) createSchema() error { _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS service_event ( service_event_id INTEGER PRIMARY KEY, - service_id INTEGER REFERENCES service(id), + service_id INTEGER REFERENCES service(service_id) ON DELETE CASCADE, event_type TEXT, event_timestamp TIMESTAMP ) @@ -82,7 +89,7 @@ func (s *Store) createSchema() error { _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS service_result ( service_result_id INTEGER PRIMARY KEY, - service_id INTEGER REFERENCES service(id), + service_id INTEGER REFERENCES service(service_id) ON DELETE CASCADE, success INTEGER, errors TEXT, connected INTEGER, @@ -101,7 +108,7 @@ func (s *Store) createSchema() error { _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS service_result_condition ( service_result_condition_id INTEGER PRIMARY KEY, - service_result_id INTEGER REFERENCES service_result(service_result_id), + service_result_id INTEGER REFERENCES service_result(service_result_id) ON DELETE CASCADE, condition TEXT, success INTEGER ) @@ -143,6 +150,127 @@ func (s *Store) GetServiceStatusByKey(key string) *core.ServiceStatus { return serviceStatus } +// Insert adds the observed result for the specified service into the store +func (s *Store) Insert(service *core.Service, result *core.Result) { + tx, err := s.db.Begin() + if err != nil { + return + } + //start := time.Now() + serviceID, err := s.getServiceID(tx, service) + if err != nil { + if err == errServiceNotFoundInDatabase { + // Service doesn't exist in the database, insert it + if serviceID, err = s.insertService(tx, service); err != nil { + return // failed to insert service + } + } else { + return + } + } + // First, we need to check if we need to insert a new event. + // + // A new event must be added if either of the following cases happen: + // 1. There is only 1 event. The total number of events for a service can only be 1 if the only existing event is + // of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy + // based on result.Success. + // 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or + // vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy + // based on result.Success. + numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID) + if err != nil { + return + } + if numberOfEvents == 0 { + // There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event + err = s.insertEvent(tx, serviceID, &core.Event{ + Type: core.EventStart, + Timestamp: result.Timestamp.Add(-50 * time.Millisecond), + }) + if err != nil { + // Silently fail + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) + } + event := generateEventBasedOnResult(result) + err = s.insertEvent(tx, serviceID, event) + if err != nil { + // Silently fail + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) + } + } else { + // Get the success value of the previous result + var lastResultSuccess bool + lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) + if err != nil { + log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } else { + // If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. + // If the final outcome (success or failure) of the previous and the new result aren't the same, it means + // that the service either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add + // an event to mark the change in state + if lastResultSuccess != result.Success { + event := generateEventBasedOnResult(result) + err = s.insertEvent(tx, serviceID, event) + if err != nil { + // Silently fail + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) + } + } + } + // Clean up old events if there's more than twice the maximum number of events + // This lets us both keep the table clean without impacting performance too much + // (since we're only deleting MaximumNumberOfEvents at a time instead of 1) + if numberOfEvents > core.MaximumNumberOfEvents*2 { + err = s.deleteOldEvents(tx, serviceID) + if err != nil { + log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + } + } + // Second, we need to insert the result. + err = s.insertResult(tx, serviceID, result) + if err != nil { + // Silently fail + log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + // Clean up old results + numberOfResults, err := s.getNumberOfResultsByServiceID(tx, serviceID) + if err != nil { + return + } + if numberOfResults > core.MaximumNumberOfResults*2 { + err = s.deleteOldResults(tx, serviceID) + if err != nil { + log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + } + //log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) + if err = tx.Commit(); err != nil { + _ = tx.Rollback() + } + return +} + +// DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided +func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { + panic("implement me") +} + +// Clear deletes everything from the store +func (s *Store) Clear() { + _, _ = s.db.Exec("DELETE FROM service") +} + +// Save does nothing, because this store is immediately persistent. +func (s *Store) Save() error { + return nil +} + +// Close the database handle +func (s *Store) Close() { + _ = s.db.Close() +} + func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name string, err error) { rows, err := s.db.Query("SELECT service_id, service_name, service_group FROM service WHERE service_key = $1 LIMIT 1", key) if err != nil { @@ -151,6 +279,7 @@ func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name for rows.Next() { _ = rows.Scan(&id, &name, &group) } + _ = rows.Close() if id == 0 { return 0, "", "", errServiceNotFoundInDatabase } @@ -178,7 +307,8 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, FROM service_result WHERE service_id = $1 ORDER BY timestamp DESC - LIMIT $2`, + LIMIT $2 + `, serviceID, core.MaximumNumberOfResults, ) @@ -222,108 +352,13 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, } _ = rows.Close() } - err = transaction.Commit() - if err != nil { + if err = transaction.Commit(); err != nil { _ = transaction.Rollback() return } return } -// Insert adds the observed result for the specified service into the store -func (s *Store) Insert(service *core.Service, result *core.Result) { - tx, err := s.db.Begin() - if err != nil { - return - } - //start := time.Now() - serviceID, err := s.getServiceID(tx, service) - if err != nil { - if err == errServiceNotFoundInDatabase { - // Service doesn't exist in the database, insert it - if serviceID, err = s.insertService(tx, service); err != nil { - return // failed to insert service - } - } else { - return - } - } - // First, we need to check if we need to insert a new event. - // - // A new event must be added if either of the following cases happen: - // 1. There is only 1 event. The total number of events for a service can only be 1 if the only existing event is - // of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy - // based on result.Success. - // 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or - // vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy - // based on result.Success. - numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID) - if err != nil { - return - } - - // Clean up old events if there's more than twice the maximum number of events - // This lets us both keep the table clean without impacting performance too much - // (since we're only deleting MaximumNumberOfEvents at a time instead of 1) - if numberOfEvents > core.MaximumNumberOfEvents*2 { - err = s.deleteOldEvents(tx, serviceID) - if err != nil { - log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) - } - } - - //log.Printf("there are currently %d events", numberOfEvents) - if numberOfEvents == 0 { - // There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event - err = s.insertEvent(tx, serviceID, &core.Event{ - Type: core.EventStart, - Timestamp: result.Timestamp.Add(-50 * time.Millisecond), - }) - if err != nil { - // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) - } - event := generateEventBasedOnResult(result) - err = s.insertEvent(tx, serviceID, event) - if err != nil { - // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) - } - } else { - // Get the success value of the previous result - var lastResultSuccess bool - lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) - if err != nil { - log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) - } else { - // If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. - // If the final outcome (success or failure) of the previous and the new result aren't the same, it means - // that the service either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add - // an event to mark the change in state - if lastResultSuccess != result.Success { - event := generateEventBasedOnResult(result) - err = s.insertEvent(tx, serviceID, event) - if err != nil { - // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) - } - } - } - } - // Second, we need to insert the result. - err = s.insertResult(tx, serviceID, result) - if err != nil { - // Silently fail - log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) - } - //log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) - err = tx.Commit() - if err != nil { - _ = tx.Rollback() - } - return -} - func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) { rows, err := tx.Query( "SELECT service_id FROM service WHERE service_key = $1", @@ -381,6 +416,25 @@ func (s *Store) getNumberOfEventsByServiceID(tx *sql.Tx, serviceID int64) (int64 return numberOfEvents, nil } +func (s *Store) getNumberOfResultsByServiceID(tx *sql.Tx, serviceID int64) (int64, error) { + rows, err := tx.Query("SELECT COUNT(1) FROM service_result WHERE service_id = $1", serviceID) + if err != nil { + return 0, err + } + var numberOfResults int64 + var found bool + for rows.Next() { + _ = rows.Scan(&numberOfResults) + found = true + break + } + _ = rows.Close() + if !found { + return 0, errNoRowsReturned + } + return numberOfResults, nil +} + // insertEvent inserts a service event in the store func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) error { _, err := tx.Exec( @@ -459,9 +513,9 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit return nil } -// insertService inserts a service in the store and returns the generated id of said service +// deleteOldEvents deletes old service events that are no longer needed func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { - result, err := tx.Exec( + _, err := tx.Exec( ` DELETE FROM service_event WHERE service_id = $1 @@ -479,27 +533,32 @@ func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { if err != nil { return err } - rowsAffected, _ := result.RowsAffected() - log.Printf("deleted %d rows", rowsAffected) + //rowsAffected, _ := result.RowsAffected() + //log.Printf("deleted %d rows from service_event", rowsAffected) return nil } -// DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided -func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { - panic("implement me") -} - -// Clear deletes everything from the store -func (s *Store) Clear() { - panic("implement me") -} - -// Save does nothing, because this store is immediately persistent. -func (s *Store) Save() error { +// deleteOldResults deletes old service results that are no longer needed +func (s *Store) deleteOldResults(tx *sql.Tx, serviceID int64) error { + _, err := tx.Exec( + ` + DELETE FROM service_result + WHERE service_id = $1 + AND service_result_id NOT IN ( + SELECT service_result_id + FROM service_result + WHERE service_id = $1 + ORDER BY service_result_id DESC + LIMIT $2 + ) + `, + serviceID, + core.MaximumNumberOfResults, + ) + if err != nil { + return err + } + //rowsAffected, _ := result.RowsAffected() + //log.Printf("deleted %d rows from service_result", rowsAffected) return nil } - -// Close the database handle -func (s *Store) Close() { - _ = s.db.Close() -} diff --git a/storage/store/store.go b/storage/store/store.go index 551ed6db..64f1fd02 100644 --- a/storage/store/store.go +++ b/storage/store/store.go @@ -33,6 +33,8 @@ type Store interface { Save() error } +// TODO: add method to check state of store (by keeping track of silent errors) + var ( // Validate interface implementation on compile _ Store = (*memory.Store)(nil)