2020-03-07 12:53:37 +05:30
package policystatus
2020-02-25 20:55:07 +05:30
import (
2020-03-07 16:23:17 +05:30
"encoding/json"
2020-03-20 11:43:21 -07:00
"fmt"
2020-02-25 20:55:07 +05:30
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/nirmata/kyverno/pkg/client/clientset/versioned"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
2020-03-20 11:43:21 -07:00
log "sigs.k8s.io/controller-runtime/pkg/log"
2020-02-25 20:55:07 +05:30
)
2020-03-07 16:23:17 +05:30
// Policy status implementation works in the following way,
//Currently policy status maintains a cache of the status of
//each policy.
//Every x unit of time the status of policy is updated using
//the data from the cache.
//The sync exposes a listener which accepts a statusUpdater
//interface which dictates how the status should be updated.
//The status is updated by a worker that receives the interface
//on a channel.
//The worker then updates the current status using the methods
//exposed by the interface.
//Current implementation is designed to be threadsafe with optimised
//locking for each policy.
// statusUpdater defines a type to have a method which
//updates the given status
2020-02-29 22:39:27 +05:30
type statusUpdater interface {
2020-03-04 15:45:20 +05:30
PolicyName ( ) string
UpdateStatus ( status v1 . PolicyStatus ) v1 . PolicyStatus
2020-02-29 22:39:27 +05:30
}
type policyStore interface {
Get ( policyName string ) ( * v1 . ClusterPolicy , error )
}
2020-03-07 12:53:37 +05:30
type Listener chan statusUpdater
func ( l Listener ) Send ( s statusUpdater ) {
l <- s
}
2020-03-07 16:23:17 +05:30
// Sync is the object which is used to initialize
//the policyStatus sync, can be considered the parent object
//since it contains access to all the persistant data present
//in this package.
2020-02-25 20:55:07 +05:30
type Sync struct {
2020-03-04 15:45:20 +05:30
cache * cache
2020-03-07 12:53:37 +05:30
Listener Listener
2020-02-25 20:55:07 +05:30
client * versioned . Clientset
2020-03-04 15:45:20 +05:30
policyStore policyStore
2020-02-25 20:55:07 +05:30
}
type cache struct {
2020-03-07 14:56:42 +05:30
dataMu sync . RWMutex
data map [ string ] v1 . PolicyStatus
keyToMutex * keyToMutex
2020-02-25 20:55:07 +05:30
}
2020-02-29 22:39:27 +05:30
func NewSync ( c * versioned . Clientset , p policyStore ) * Sync {
2020-02-25 20:55:07 +05:30
return & Sync {
2020-03-04 15:45:20 +05:30
cache : & cache {
2020-03-07 14:56:42 +05:30
dataMu : sync . RWMutex { } ,
data : make ( map [ string ] v1 . PolicyStatus ) ,
keyToMutex : newKeyToMutex ( ) ,
2020-02-25 20:55:07 +05:30
} ,
client : c ,
2020-03-04 15:45:20 +05:30
policyStore : p ,
2020-03-04 13:35:49 +05:30
Listener : make ( chan statusUpdater , 20 ) ,
2020-02-25 20:55:07 +05:30
}
}
2020-02-29 17:19:00 +05:30
func ( s * Sync ) Run ( workers int , stopCh <- chan struct { } ) {
2020-02-25 21:07:00 +05:30
for i := 0 ; i < workers ; i ++ {
2020-02-29 17:19:00 +05:30
go s . updateStatusCache ( stopCh )
2020-02-25 21:07:00 +05:30
}
2020-04-12 18:58:55 +05:30
wait . Until ( s . updatePolicyStatus , 10 * time . Second , stopCh )
2020-02-29 17:19:00 +05:30
<- stopCh
2020-02-25 20:55:07 +05:30
}
2020-03-07 16:23:17 +05:30
// updateStatusCache is a worker which updates the current status
//using the statusUpdater interface
2020-02-29 17:19:00 +05:30
func ( s * Sync ) updateStatusCache ( stopCh <- chan struct { } ) {
2020-02-25 20:55:07 +05:30
for {
select {
2020-02-29 22:39:27 +05:30
case statusUpdater := <- s . Listener :
2020-03-07 14:56:42 +05:30
s . cache . keyToMutex . Get ( statusUpdater . PolicyName ( ) ) . Lock ( )
2020-03-04 15:45:20 +05:30
2020-03-07 14:56:42 +05:30
s . cache . dataMu . RLock ( )
2020-03-04 15:45:20 +05:30
status , exist := s . cache . data [ statusUpdater . PolicyName ( ) ]
2020-03-07 14:56:42 +05:30
s . cache . dataMu . RUnlock ( )
2020-03-04 15:45:20 +05:30
if ! exist {
policy , _ := s . policyStore . Get ( statusUpdater . PolicyName ( ) )
if policy != nil {
status = policy . Status
}
}
2020-03-07 14:56:42 +05:30
updatedStatus := statusUpdater . UpdateStatus ( status )
2020-03-04 15:45:20 +05:30
2020-03-07 14:56:42 +05:30
s . cache . dataMu . Lock ( )
s . cache . data [ statusUpdater . PolicyName ( ) ] = updatedStatus
s . cache . dataMu . Unlock ( )
s . cache . keyToMutex . Get ( statusUpdater . PolicyName ( ) ) . Unlock ( )
2020-03-07 16:23:17 +05:30
oldStatus , _ := json . Marshal ( status )
newStatus , _ := json . Marshal ( updatedStatus )
2020-03-20 11:43:21 -07:00
log . Log . V ( 4 ) . Info ( fmt . Sprintf ( "\nupdated status of policy - %v\noldStatus:\n%v\nnewStatus:\n%v\n" , statusUpdater . PolicyName ( ) , string ( oldStatus ) , string ( newStatus ) ) )
2020-02-29 17:19:00 +05:30
case <- stopCh :
2020-02-25 20:55:07 +05:30
return
}
}
}
2020-03-07 16:23:17 +05:30
// updatePolicyStatus updates the status in the policy resource definition
//from the status cache, syncing them
2020-02-25 20:55:07 +05:30
func ( s * Sync ) updatePolicyStatus ( ) {
2020-03-07 14:56:42 +05:30
s . cache . dataMu . Lock ( )
2020-03-04 15:45:20 +05:30
var nameToStatus = make ( map [ string ] v1 . PolicyStatus , len ( s . cache . data ) )
for k , v := range s . cache . data {
2020-02-25 20:55:07 +05:30
nameToStatus [ k ] = v
}
2020-03-07 14:56:42 +05:30
s . cache . dataMu . Unlock ( )
2020-02-25 20:55:07 +05:30
for policyName , status := range nameToStatus {
2020-03-04 15:45:20 +05:30
policy , err := s . policyStore . Get ( policyName )
2020-02-25 20:55:07 +05:30
if err != nil {
continue
}
policy . Status = status
_ , err = s . client . KyvernoV1 ( ) . ClusterPolicies ( ) . UpdateStatus ( policy )
if err != nil {
2020-03-07 14:56:42 +05:30
s . cache . dataMu . Lock ( )
2020-03-04 15:45:20 +05:30
delete ( s . cache . data , policyName )
2020-03-07 14:56:42 +05:30
s . cache . dataMu . Unlock ( )
2020-03-20 11:43:21 -07:00
log . Log . Error ( err , "failed to update policy status" )
2020-02-25 20:55:07 +05:30
}
}
}