mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
GT-526 Use agency cache lock in metrics exporter (#1470)
This commit is contained in:
parent
7819c2bb38
commit
70d22edc6d
7 changed files with 82 additions and 40 deletions
|
@ -14,6 +14,7 @@
|
|||
- (Documentation) Update ArangoDeploymentReplication and ArangoLocalStorage CR auto-generated docs
|
||||
- (Feature) Add ArangoMember Message and extend ArangoMember CRD
|
||||
- (Documentation) Use OpenAPI-compatible type names in docs
|
||||
- (Improvement) Use agency cache lock in metrics exporter
|
||||
|
||||
## [1.2.34](https://github.com/arangodb/kube-arangodb/tree/1.2.34) (2023-10-16)
|
||||
- (Bugfix) Fix make manifests-crd-file command
|
||||
|
|
|
@ -144,7 +144,11 @@ type Health interface {
|
|||
|
||||
type Cache interface {
|
||||
Reload(ctx context.Context, size int, clients Connections) (uint64, error)
|
||||
// Deprecated: Use Apply instead.
|
||||
// It can cause Read/Write error when state is reloaded.
|
||||
Data() (state.State, bool)
|
||||
// Apply applies a function to the current state. Returns true if function was applied
|
||||
Apply(f func(state.State)) bool
|
||||
DataDB() (state.DB, bool)
|
||||
CommitIndex() uint64
|
||||
// Health returns true when healthy object is available.
|
||||
|
@ -202,6 +206,10 @@ func (c cacheSingle) Reload(_ context.Context, _ int, _ Connections) (uint64, er
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
func (c cacheSingle) Apply(f func(state.State)) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c cacheSingle) Data() (state.State, bool) {
|
||||
return state.State{}, true
|
||||
}
|
||||
|
@ -232,6 +240,19 @@ func (c *cache) CommitIndex() uint64 {
|
|||
return index
|
||||
}
|
||||
|
||||
func (c *cache) Apply(f func(state.State)) bool {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
|
||||
data, _, ok := c.loader.State()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
f(data.Arango)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *cache) Data() (state.State, bool) {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
|
|
|
@ -159,6 +159,11 @@ func (d *Deployment) GetMembersState() memberState.StateInspector {
|
|||
return d.memberState
|
||||
}
|
||||
|
||||
func (d *Deployment) WithAgencyCache(action func(state.State)) bool {
|
||||
return d.agencyCache.Apply(action)
|
||||
}
|
||||
|
||||
// Deprecated: Use WithAgencyCache instead
|
||||
func (d *Deployment) GetAgencyCache() (state.State, bool) {
|
||||
return d.agencyCache.Data()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -24,6 +24,7 @@ import (
|
|||
"sync"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
|
||||
"github.com/arangodb/kube-arangodb/pkg/metrics/collector"
|
||||
|
@ -76,8 +77,8 @@ func (i *inventory) CollectMetrics(in metrics.PushMetric) {
|
|||
|
||||
deployment.CollectMetrics(in)
|
||||
|
||||
if state := deployment.acs.CurrentClusterCache(); state != nil {
|
||||
t := state.GetThrottles()
|
||||
if cacheInspector := deployment.acs.CurrentClusterCache(); cacheInspector != nil {
|
||||
t := cacheInspector.GetThrottles()
|
||||
|
||||
for _, c := range definitions.AllComponents() {
|
||||
in.Push(i.operatorStateRefreshMetric.Gauge(float64(t.Get(c).Count()), deployment.GetNamespace(), deployment.GetName(), string(c)))
|
||||
|
@ -92,58 +93,59 @@ func (i *inventory) CollectMetrics(in metrics.PushMetric) {
|
|||
}
|
||||
|
||||
if spec.Mode.Get().HasAgents() {
|
||||
agency, agencyOk := deployment.GetAgencyCache()
|
||||
if !agencyOk {
|
||||
in.Push(i.deploymentAgencyStateMetric.Gauge(0, deployment.GetNamespace(), deployment.GetName()))
|
||||
continue
|
||||
}
|
||||
|
||||
in.Push(i.deploymentAgencyStateMetric.Gauge(1, deployment.GetNamespace(), deployment.GetName()))
|
||||
|
||||
if spec.Mode.Get() == api.DeploymentModeCluster {
|
||||
for db, collections := range agency.Current.Collections {
|
||||
dbName := db
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
dbName = "UNKNOWN"
|
||||
applied := deployment.WithAgencyCache(func(st state.State) {
|
||||
for db, collections := range st.Current.Collections {
|
||||
dbName := db
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
dbName = "UNKNOWN"
|
||||
|
||||
if v, ok := agency.Plan.Databases[db]; ok && v.ID != "" {
|
||||
dbName = v.ID
|
||||
if v, ok := st.Plan.Databases[db]; ok && v.ID != "" {
|
||||
dbName = v.ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for collection, shards := range collections {
|
||||
for shard, details := range shards {
|
||||
for id, server := range details.Servers {
|
||||
collectionName := "UNKNOWN"
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
collectionName = collection
|
||||
} else {
|
||||
if _, ok := agency.Plan.Collections[db]; ok {
|
||||
if _, ok := agency.Plan.Collections[db][collection]; ok {
|
||||
collectionName = agency.Plan.Collections[db][collection].GetName(collectionName)
|
||||
for collection, shards := range collections {
|
||||
for shard, details := range shards {
|
||||
for id, server := range details.Servers {
|
||||
collectionName := "UNKNOWN"
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
collectionName = collection
|
||||
} else {
|
||||
if _, ok := st.Plan.Collections[db]; ok {
|
||||
if _, ok := st.Plan.Collections[db][collection]; ok {
|
||||
collectionName = st.Plan.Collections[db][collection].GetName(collectionName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m := []string{
|
||||
deployment.GetNamespace(),
|
||||
deployment.GetName(),
|
||||
dbName,
|
||||
collectionName,
|
||||
shard,
|
||||
string(server),
|
||||
}
|
||||
m := []string{
|
||||
deployment.GetNamespace(),
|
||||
deployment.GetName(),
|
||||
dbName,
|
||||
collectionName,
|
||||
shard,
|
||||
string(server),
|
||||
}
|
||||
|
||||
if id == 0 {
|
||||
in.Push(i.deploymentShardLeadersMetric.Gauge(1, m...))
|
||||
if id == 0 {
|
||||
in.Push(i.deploymentShardLeadersMetric.Gauge(1, m...))
|
||||
}
|
||||
in.Push(i.deploymentShardsMetric.Gauge(1, m...))
|
||||
}
|
||||
in.Push(i.deploymentShardsMetric.Gauge(1, m...))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if !applied {
|
||||
in.Push(i.deploymentAgencyStateMetric.Gauge(0, deployment.GetNamespace(), deployment.GetName()))
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
in.Push(i.deploymentAgencyStateMetric.Gauge(1, deployment.GetNamespace(), deployment.GetName()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -260,6 +260,10 @@ func (ac *actionContext) ShardsInSyncMap() (state.ShardsSyncStatus, bool) {
|
|||
return ac.context.ShardsInSyncMap()
|
||||
}
|
||||
|
||||
func (ac *actionContext) WithAgencyCache(action func(state.State)) bool {
|
||||
return ac.context.WithAgencyCache(action)
|
||||
}
|
||||
|
||||
func (ac *actionContext) GetAgencyCache() (state.State, bool) {
|
||||
return ac.context.GetAgencyCache()
|
||||
}
|
||||
|
|
|
@ -176,6 +176,10 @@ func (c *testContext) GenerateMemberEndpoint(group api.ServerGroup, member api.M
|
|||
return pod2.GenerateMemberEndpoint(c.Inspector, c.ArangoDeployment, c.ArangoDeployment.Spec, group, member)
|
||||
}
|
||||
|
||||
func (c *testContext) WithAgencyCache(action func(state.State)) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *testContext) GetAgencyCache() (state.State, bool) {
|
||||
return state.State{}, true
|
||||
}
|
||||
|
|
|
@ -96,6 +96,11 @@ type DeploymentImageManager interface {
|
|||
}
|
||||
|
||||
type ArangoAgencyGet interface {
|
||||
// WithAgencyCache executes the given action with the agency cache using a Read lock. Returns true if action was applied
|
||||
WithAgencyCache(action func(state.State)) bool
|
||||
// GetAgencyCache returns the agency cache.
|
||||
// It can cause to Read/Write error when state is reloaded.
|
||||
// It is recommended to use WithAgencyCache instead.
|
||||
GetAgencyCache() (state.State, bool)
|
||||
GetAgencyArangoDBCache() (state.DB, bool)
|
||||
GetAgencyHealth() (agencyCache.Health, bool)
|
||||
|
|
Loading…
Reference in a new issue