1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

added a push update when deleting keys

This commit is contained in:
postmannen 2022-06-02 09:36:09 +02:00
parent 7e8ecc3397
commit bf14ad4d2a

View file

@ -304,83 +304,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
// //
// If there are errors we will return from the function, and send no // If there are errors we will return from the function, and send no
// updates. // updates.
err := func() error { err := pushKeys(proc, message, []Node{})
var knh []byte
err := func() error {
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
if err != nil {
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err)
return er
}
copy(knh, b)
return nil
}()
if err != nil {
return err
}
// proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
// defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
// For all nodes that is not ack'ed we try to send an update once.
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
msg := Message{
ToNode: n,
Method: REQKeysDeliverUpdate,
ReplyMethod: REQNone,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
fmt.Printf("\n ----> methodREQKeysAllow: SENDING KEYS TO NODE=%v\n", message.FromNode)
}
// Create the data payload of the current allowed keys.
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
if err != nil {
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err)
proc.errorKernel.errSend(proc, message, er)
}
// For all nodes that is ack'ed we try to send an update once.
for n := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
msg := Message{
ToNode: n,
Method: REQKeysDeliverUpdate,
Data: b,
ReplyMethod: REQNone,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
log.Printf("\n ----> methodREQKeysAllow: sending keys update to node=%v\n", message.FromNode)
}
return nil
}()
if err != nil { if err != nil {
proc.errorKernel.errSend(proc, message, err) proc.errorKernel.errSend(proc, message, err)
@ -394,6 +318,107 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
return ackMsg, nil return ackMsg, nil
} }
// pushKeys will try to push the the current keys and hashes to each node once.
// As input arguments it takes the current process, the current message, and a
// []Node in nodes.
// The nodes are used when a node or nodes have been deleted from the current
// nodesAcked map since it will contain the nodes that were deleted so we are
// also able to send an update to them as well.
func pushKeys(proc process, message Message, nodes []Node) error {
fmt.Printf(" \n\n\n ************** beginning of pushKeys, nodes=%v\n", nodes)
var knh []byte
err := func() error {
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
if err != nil {
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err)
return er
}
copy(knh, b)
return nil
}()
if err != nil {
return err
}
// proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
// defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
// For all nodes that is not ack'ed we try to send an update once.
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n)
msg := Message{
ToNode: n,
Method: REQKeysDeliverUpdate,
ReplyMethod: REQNone,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
fmt.Printf("\n ----> methodREQKeysAllow: SENDING KEYS TO NODE=%v\n", message.FromNode)
}
// Create the data payload of the current allowed keys.
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
if err != nil {
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err)
proc.errorKernel.errSend(proc, message, er)
}
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
// Concatenate the current nodes in the keysAndHash map and the nodes
// we got from the function argument when this function was called.
nodeMap := make(map[Node]struct{})
for n := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
nodeMap[n] = struct{}{}
}
for _, n := range nodes {
nodeMap[n] = struct{}{}
}
// For all nodes that is ack'ed we try to send an update once.
for n := range nodeMap {
fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n)
msg := Message{
ToNode: n,
Method: REQKeysDeliverUpdate,
Data: b,
ReplyMethod: REQNone,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
log.Printf("\n ----> methodREQKeysAllow: sending keys update to node=%v\n", message.FromNode)
}
return nil
}
type methodREQKeysDelete struct { type methodREQKeysDelete struct {
event Event event Event
} }
@ -440,6 +465,19 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
proc.centralAuth.updateHash(proc, message) proc.centralAuth.updateHash(proc, message)
log.Printf(" * DEBUG updated hash for public keys\n") log.Printf(" * DEBUG updated hash for public keys\n")
var nodes []Node
for _, n := range message.MethodArgs {
nodes = append(nodes, Node(n))
}
err := pushKeys(proc, message, nodes)
if err != nil {
proc.errorKernel.errSend(proc, message, err)
return
}
outString := fmt.Sprintf("deleted public keys for the nodes=%v\n", message.MethodArgs) outString := fmt.Sprintf("deleted public keys for the nodes=%v\n", message.MethodArgs)
out := []byte(outString) out := []byte(outString)
@ -460,8 +498,7 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
proc.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
case out := <-outCh: case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
} }