mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
501 lines
16 KiB
Go
501 lines
16 KiB
Go
package ctrl
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// ---
|
|
|
|
// Handler to get the public ed25519 key from a node.
|
|
func methodPublicKey(proc process, message Message, node string) ([]byte, error) {
|
|
// Get a context with the timeout specified in message.MethodTimeout.
|
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
outCh := make(chan []byte)
|
|
|
|
go func() {
|
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
|
// In this case this go func and the below select is not needed, but keeping it so the
|
|
// structure is the same as the other handlers.
|
|
select {
|
|
case <-ctx.Done():
|
|
case outCh <- proc.nodeAuth.SignPublicKey:
|
|
}
|
|
}()
|
|
|
|
select {
|
|
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
|
case <-ctx.Done():
|
|
case out := <-outCh:
|
|
|
|
// Prepare and queue for sending a new message with the output
|
|
// of the action executed.
|
|
newReplyMessage(proc, message, out)
|
|
}
|
|
}()
|
|
|
|
// Send back an ACK message.
|
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
|
return ackMsg, nil
|
|
}
|
|
|
|
// ----
|
|
|
|
// Handler to get all the public ed25519 keys from a central server.
|
|
func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte, error) {
|
|
// Get a context with the timeout specified in message.MethodTimeout.
|
|
|
|
// TODO:
|
|
// - Since this is implemented as a NACK message we could implement a
|
|
// metric thats shows the last time a node did a key request.
|
|
// - We could also implement a metrics on the receiver showing the last
|
|
// time a node had done an update.
|
|
|
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
outCh := make(chan []byte)
|
|
|
|
go func() {
|
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
|
// In this case this go func and the below select is not needed, but keeping it so the
|
|
// structure is the same as the other handlers.
|
|
select {
|
|
case <-ctx.Done():
|
|
case outCh <- []byte{}:
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
// case out := <-outCh:
|
|
case <-outCh:
|
|
// Using a func here to set the scope of the lock, and then be able to
|
|
// defer the unlock when leaving that scope.
|
|
func() {
|
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
|
|
|
er := fmt.Errorf(" <---- methodKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data)
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
// Check if the received hash is the same as the one currently active,
|
|
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
|
er := fmt.Errorf("info: methodKeysRequestUpdate: node %v and central have equal keys, nothing to do, exiting key update handler", message.FromNode)
|
|
// proc.errorKernel.infoSend(proc, message, er)
|
|
proc.errorKernel.logDebug(er)
|
|
return
|
|
}
|
|
|
|
er = fmt.Errorf("info: methodKeysRequestUpdate: node %v and central had not equal keys, preparing to send new version of keys", message.FromNode)
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
er = fmt.Errorf("info: methodKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
|
|
|
if err != nil {
|
|
er := fmt.Errorf("error: methodKeysRequestUpdate, failed to marshal keys map: %v", err)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
}
|
|
er = fmt.Errorf("----> methodKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode)
|
|
proc.errorKernel.logDebug(er)
|
|
newReplyMessage(proc, message, b)
|
|
}()
|
|
}
|
|
}()
|
|
|
|
// We're not sending an ACK message for this request type.
|
|
return nil, nil
|
|
}
|
|
|
|
// Define the startup of a publisher that will send KeysRequestUpdate
|
|
// to central server and ask for publics keys, and to get them deliver back with a request
|
|
// of type KeysDeliverUpdate.
|
|
func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error {
|
|
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.KeysUpdateInterval))
|
|
defer ticker.Stop()
|
|
for {
|
|
|
|
// Send a message with the hash of the currently stored keys,
|
|
// so we would know on the subscriber at central if it should send
|
|
// and update with new keys back.
|
|
|
|
proc.nodeAuth.publicKeys.mu.Lock()
|
|
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
m := Message{
|
|
FileName: "publickeysget.log",
|
|
Directory: "publickeysget",
|
|
ToNode: Node(proc.configuration.CentralNodeName),
|
|
FromNode: Node(proc.node),
|
|
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
|
|
Method: KeysRequestUpdate,
|
|
ReplyMethod: KeysDeliverUpdate,
|
|
ACKTimeout: proc.configuration.DefaultMessageTimeout,
|
|
Retries: 1,
|
|
}
|
|
proc.nodeAuth.publicKeys.mu.Unlock()
|
|
|
|
proc.newMessagesCh <- m
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
case <-ctx.Done():
|
|
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
|
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
|
proc.errorKernel.logDebug(er)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----
|
|
|
|
// Handler to receive the public keys from a central server.
|
|
func methodKeysDeliverUpdate(proc process, message Message, node string) ([]byte, error) {
|
|
// Get a context with the timeout specified in message.MethodTimeout.
|
|
|
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
outCh := make(chan []byte)
|
|
|
|
go func() {
|
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
|
// In this case this go func and the below select is not needed, but keeping it so the
|
|
// structure is the same as the other handlers.
|
|
select {
|
|
case <-ctx.Done():
|
|
case outCh <- []byte{}:
|
|
}
|
|
}()
|
|
|
|
select {
|
|
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
|
case <-ctx.Done():
|
|
case <-outCh:
|
|
|
|
proc.nodeAuth.publicKeys.mu.Lock()
|
|
|
|
var keysAndHash keysAndHash
|
|
|
|
err := json.Unmarshal(message.Data, &keysAndHash)
|
|
if err != nil {
|
|
er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
}
|
|
|
|
er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash)
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
// If the received map was empty we also want to delete all the locally stored keys,
|
|
// else we copy the marshaled keysAndHash we received from central into our map.
|
|
if len(keysAndHash.Keys) < 1 {
|
|
proc.nodeAuth.publicKeys.keysAndHash = newKeysAndHash()
|
|
} else {
|
|
proc.nodeAuth.publicKeys.keysAndHash = &keysAndHash
|
|
}
|
|
|
|
proc.nodeAuth.publicKeys.mu.Unlock()
|
|
|
|
if err != nil {
|
|
er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
}
|
|
|
|
// We need to also persist the hash on the receiving nodes. We can then load
|
|
// that key upon startup.
|
|
|
|
err = proc.nodeAuth.publicKeys.saveToFile()
|
|
if err != nil {
|
|
er := fmt.Errorf("error: REQKeysDeliverUpdate : save to file failed: %v, message: %v", err, message)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
}
|
|
|
|
// Prepare and queue for sending a new message with the output
|
|
// of the action executed.
|
|
// newReplyMessage(proc, message, out)
|
|
}
|
|
}()
|
|
|
|
// Send back an ACK message.
|
|
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
|
return nil, nil
|
|
}
|
|
|
|
// ----
|
|
|
|
// Handler to allow new public keys into the database on central auth.
|
|
// Nodes will send the public key in the REQHello messages. When they
|
|
// are recived on the central server they will be put into a temp key
|
|
// map, and we need to acknowledge them before they are moved into the
|
|
// main key map, and then allowed to be sent out to other nodes.
|
|
func methodKeysAllow(proc process, message Message, node string) ([]byte, error) {
|
|
// Get a context with the timeout specified in message.MethodTimeout.
|
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
outCh := make(chan []byte)
|
|
|
|
go func() {
|
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
|
// In this case this go func and the below select is not needed, but keeping it so the
|
|
// structure is the same as the other handlers.
|
|
select {
|
|
case <-ctx.Done():
|
|
case outCh <- []byte{}:
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-outCh:
|
|
proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
|
|
defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
|
|
|
|
// Range over all the MethodArgs, where each element represents a node to allow,
|
|
// and move the node from the notAcked map to the allowed map.
|
|
for _, n := range message.MethodArgs {
|
|
key, ok := proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap[Node(n)]
|
|
if ok {
|
|
|
|
func() {
|
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
|
|
|
// Store/update the node and public key on the allowed pubKey map.
|
|
proc.centralAuth.pki.nodesAcked.keysAndHash.Keys[Node(n)] = key
|
|
}()
|
|
|
|
// Add key to persistent storage.
|
|
proc.centralAuth.pki.dbUpdatePublicKey(string(n), key)
|
|
|
|
// Delete the key from the NotAcked map
|
|
delete(proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap, Node(n))
|
|
|
|
er := fmt.Errorf("info: REQKeysAllow : allowed new/updated public key for %v to allowed public key map", n)
|
|
proc.errorKernel.infoSend(proc, message, er)
|
|
}
|
|
}
|
|
|
|
// All new elements are now added, and we can create a new hash
|
|
// representing the current keys in the allowed map.
|
|
proc.centralAuth.updateHash(proc, message)
|
|
|
|
// If new keys were allowed into the main map, we should send out one
|
|
// single update to all the registered nodes to inform of an update.
|
|
// If a node is not reachable at the time the update is sent it is
|
|
// not a problem since the nodes will periodically check for updates.
|
|
//
|
|
// If there are errors we will return from the function, and send no
|
|
// updates.
|
|
err := pushKeys(proc, message, []Node{})
|
|
|
|
if err != nil {
|
|
proc.errorKernel.errSend(proc, message, err, logWarning)
|
|
return
|
|
}
|
|
|
|
}
|
|
}()
|
|
|
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
|
return ackMsg, nil
|
|
}
|
|
|
|
// pushKeys will try to push the the current keys and hashes to each node once.
|
|
// As input arguments it takes the current process, the current message, and a
|
|
// []Node in nodes.
|
|
// The nodes are used when a node or nodes have been deleted from the current
|
|
// nodesAcked map since it will contain the nodes that were deleted so we are
|
|
// also able to send an update to them as well.
|
|
func pushKeys(proc process, message Message, nodes []Node) error {
|
|
er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes)
|
|
var knh []byte
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
err := func() error {
|
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
|
|
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
|
if err != nil {
|
|
er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err)
|
|
return er
|
|
}
|
|
|
|
copy(knh, b)
|
|
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
|
|
// defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
|
|
|
|
// For all nodes that is not ack'ed we try to send an update once.
|
|
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
|
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
|
proc.errorKernel.logDebug(er)
|
|
msg := Message{
|
|
ToNode: n,
|
|
Method: KeysDeliverUpdate,
|
|
ReplyMethod: None,
|
|
ACKTimeout: 0,
|
|
}
|
|
|
|
proc.newMessagesCh <- msg
|
|
|
|
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
|
proc.errorKernel.logDebug(er)
|
|
}
|
|
|
|
// Create the data payload of the current allowed keys.
|
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
|
|
|
if err != nil {
|
|
er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
}
|
|
|
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
|
|
|
// Concatenate the current nodes in the keysAndHash map and the nodes
|
|
// we got from the function argument when this function was called.
|
|
nodeMap := make(map[Node]struct{})
|
|
|
|
for n := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
|
|
nodeMap[n] = struct{}{}
|
|
}
|
|
for _, n := range nodes {
|
|
nodeMap[n] = struct{}{}
|
|
}
|
|
|
|
// For all nodes that is ack'ed we try to send an update once.
|
|
for n := range nodeMap {
|
|
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
|
proc.errorKernel.logDebug(er)
|
|
msg := Message{
|
|
ToNode: n,
|
|
Method: KeysDeliverUpdate,
|
|
Data: b,
|
|
ReplyMethod: None,
|
|
ACKTimeout: 0,
|
|
}
|
|
|
|
proc.newMessagesCh <- msg
|
|
|
|
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
|
|
proc.errorKernel.logDebug(er)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func methodKeysDelete(proc process, message Message, node string) ([]byte, error) {
|
|
inf := fmt.Errorf("<--- methodKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
|
proc.errorKernel.logDebug(inf)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
|
|
// Get a context with the timeout specified in message.MethodTimeout.
|
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
|
outCh := make(chan []byte)
|
|
errCh := make(chan error)
|
|
|
|
proc.processes.wg.Add(1)
|
|
go func() {
|
|
defer proc.processes.wg.Done()
|
|
|
|
switch {
|
|
case len(message.MethodArgs) < 1:
|
|
errCh <- fmt.Errorf("error: methodAclGroupNodesDeleteNode: got <1 number methodArgs, want >0")
|
|
return
|
|
}
|
|
|
|
// HERE:
|
|
// We want to be able to define more nodes keys to delete.
|
|
// Loop over the methodArgs, and call the keyDelete function for each node,
|
|
// or rewrite the deleteKey to deleteKeys and it takes a []node as input
|
|
// so all keys can be deleted in 1 go, and we create 1 new hash, instead
|
|
// of doing it for each node delete.
|
|
|
|
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
|
er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs)
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
// All new elements are now added, and we can create a new hash
|
|
// representing the current keys in the allowed map.
|
|
proc.centralAuth.updateHash(proc, message)
|
|
er = fmt.Errorf(" * DEBUG updated hash for public keys")
|
|
proc.errorKernel.logDebug(er)
|
|
|
|
var nodes []Node
|
|
|
|
for _, n := range message.MethodArgs {
|
|
nodes = append(nodes, Node(n))
|
|
}
|
|
|
|
err := pushKeys(proc, message, nodes)
|
|
|
|
if err != nil {
|
|
proc.errorKernel.errSend(proc, message, err, logWarning)
|
|
return
|
|
}
|
|
|
|
outString := fmt.Sprintf("deleted public keys for the nodes=%v\n", message.MethodArgs)
|
|
out := []byte(outString)
|
|
|
|
select {
|
|
case outCh <- out:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
proc.errorKernel.errSend(proc, message, err, logWarning)
|
|
|
|
case <-ctx.Done():
|
|
cancel()
|
|
er := fmt.Errorf("error: methodAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs)
|
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
|
|
|
case out := <-outCh:
|
|
|
|
newReplyMessage(proc, message, out)
|
|
}
|
|
|
|
}()
|
|
|
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
|
return ackMsg, nil
|
|
}
|