2023-10-04 20:58:42 +00:00
package ctrl
2022-05-19 18:54:33 +00:00
import (
"bytes"
2024-12-13 15:49:21 +00:00
"context"
2022-05-19 18:54:33 +00:00
"encoding/json"
"fmt"
2024-12-13 15:49:21 +00:00
"time"
2022-05-19 18:54:33 +00:00
)
// ---
// Handler to get the public ed25519 key from a node.
2024-11-19 02:48:42 +00:00
func methodPublicKey ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-05-19 18:54:33 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
select {
case <- ctx . Done ( ) :
case outCh <- proc . nodeAuth . SignPublicKey :
}
} ( )
select {
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <- ctx . Done ( ) :
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
} ( )
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
// Handler to get all the public ed25519 keys from a central server.
2024-11-19 02:48:42 +00:00
func methodKeysRequestUpdate ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-05-19 18:54:33 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
// TODO:
// - Since this is implemented as a NACK message we could implement a
// metric thats shows the last time a node did a key request.
// - We could also implement a metrics on the receiver showing the last
// time a node had done an update.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
select {
case <- ctx . Done ( ) :
case outCh <- [ ] byte { } :
}
} ( )
select {
case <- ctx . Done ( ) :
// case out := <-outCh:
case <- outCh :
// Using a func here to set the scope of the lock, and then be able to
// defer the unlock when leaving that scope.
func ( ) {
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( " <---- methodKeysRequestUpdate: received hash from NODE=%v, HASH=%v" , message . FromNode , message . Data )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
// Check if the received hash is the same as the one currently active,
if bytes . Equal ( proc . centralAuth . pki . nodesAcked . keysAndHash . Hash [ : ] , message . Data ) {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "info: methodKeysRequestUpdate: node %v and central have equal keys, nothing to do, exiting key update handler" , message . FromNode )
2022-12-21 06:07:21 +00:00
// proc.errorKernel.infoSend(proc, message, er)
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
return
}
2024-11-19 02:48:42 +00:00
er = fmt . Errorf ( "info: methodKeysRequestUpdate: node %v and central had not equal keys, preparing to send new version of keys" , message . FromNode )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
2024-11-19 02:48:42 +00:00
er = fmt . Errorf ( "info: methodKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v" , proc . centralAuth . pki . nodesAcked . keysAndHash . Keys , proc . centralAuth . pki . nodesAcked . keysAndHash . Hash )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
b , err := json . Marshal ( proc . centralAuth . pki . nodesAcked . keysAndHash )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodKeysRequestUpdate, failed to marshal keys map: %v" , err )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-05-19 18:54:33 +00:00
}
2024-11-19 02:48:42 +00:00
er = fmt . Errorf ( "----> methodKeysRequestUpdate: SENDING KEYS TO NODE=%v" , message . FromNode )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
newReplyMessage ( proc , message , b )
} ( )
}
} ( )
2024-03-27 11:48:17 +00:00
// We're not sending an ACK message for this request type.
2022-05-19 18:54:33 +00:00
return nil , nil
}
2024-12-13 15:49:21 +00:00
// Define the startup of a publisher that will send KeysRequestUpdate
// to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate.
func procFuncKeysRequestUpdate ( ctx context . Context , proc process , procFuncCh chan Message ) error {
ticker := time . NewTicker ( time . Second * time . Duration ( proc . configuration . KeysUpdateInterval ) )
defer ticker . Stop ( )
for {
// Send a message with the hash of the currently stored keys,
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc . nodeAuth . publicKeys . mu . Lock ( )
er := fmt . Errorf ( " ----> publisher KeysRequestUpdate: sending our current hash: %v" , [ ] byte ( proc . nodeAuth . publicKeys . keysAndHash . Hash [ : ] ) )
proc . errorKernel . logDebug ( er )
m := Message {
FileName : "publickeysget.log" ,
Directory : "publickeysget" ,
ToNode : Node ( proc . configuration . CentralNodeName ) ,
FromNode : Node ( proc . node ) ,
Data : [ ] byte ( proc . nodeAuth . publicKeys . keysAndHash . Hash [ : ] ) ,
Method : KeysRequestUpdate ,
ReplyMethod : KeysDeliverUpdate ,
ACKTimeout : proc . configuration . DefaultMessageTimeout ,
Retries : 1 ,
}
proc . nodeAuth . publicKeys . mu . Unlock ( )
proc . newMessagesCh <- m
select {
case <- ticker . C :
case <- ctx . Done ( ) :
er := fmt . Errorf ( "info: stopped handleFunc for: publisher %v" , proc . subject . name ( ) )
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc . errorKernel . logDebug ( er )
return nil
}
}
}
2022-05-19 18:54:33 +00:00
// ----
2022-05-24 05:21:48 +00:00
// Handler to receive the public keys from a central server.
2024-11-19 02:48:42 +00:00
func methodKeysDeliverUpdate ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-05-19 18:54:33 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
select {
case <- ctx . Done ( ) :
case outCh <- [ ] byte { } :
}
} ( )
select {
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <- ctx . Done ( ) :
case <- outCh :
proc . nodeAuth . publicKeys . mu . Lock ( )
2022-06-01 05:29:25 +00:00
var keysAndHash keysAndHash
err := json . Unmarshal ( message . Data , & keysAndHash )
if err != nil {
er := fmt . Errorf ( "error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v" , err , message )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-06-01 05:29:25 +00:00
}
2022-10-05 07:16:22 +00:00
er := fmt . Errorf ( "<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v" , keysAndHash )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-01 05:29:25 +00:00
// If the received map was empty we also want to delete all the locally stored keys,
// else we copy the marshaled keysAndHash we received from central into our map.
if len ( keysAndHash . Keys ) < 1 {
proc . nodeAuth . publicKeys . keysAndHash = newKeysAndHash ( )
} else {
proc . nodeAuth . publicKeys . keysAndHash = & keysAndHash
}
2022-05-19 18:54:33 +00:00
proc . nodeAuth . publicKeys . mu . Unlock ( )
if err != nil {
2022-05-24 05:21:48 +00:00
er := fmt . Errorf ( "error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v" , err , message )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-05-19 18:54:33 +00:00
}
2022-05-24 13:51:36 +00:00
// We need to also persist the hash on the receiving nodes. We can then load
// that key upon startup.
2022-05-19 18:54:33 +00:00
err = proc . nodeAuth . publicKeys . saveToFile ( )
if err != nil {
2022-05-24 05:21:48 +00:00
er := fmt . Errorf ( "error: REQKeysDeliverUpdate : save to file failed: %v, message: %v" , err , message )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-05-19 18:54:33 +00:00
}
// Prepare and queue for sending a new message with the output
// of the action executed.
// newReplyMessage(proc, message, out)
}
} ( )
// Send back an ACK message.
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return nil , nil
}
// ----
// Handler to allow new public keys into the database on central auth.
// Nodes will send the public key in the REQHello messages. When they
// are recived on the central server they will be put into a temp key
// map, and we need to acknowledge them before they are moved into the
// main key map, and then allowed to be sent out to other nodes.
2024-11-19 02:48:42 +00:00
func methodKeysAllow ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-05-19 18:54:33 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
select {
case <- ctx . Done ( ) :
case outCh <- [ ] byte { } :
}
} ( )
select {
case <- ctx . Done ( ) :
case <- outCh :
proc . centralAuth . pki . nodeNotAckedPublicKeys . mu . Lock ( )
defer proc . centralAuth . pki . nodeNotAckedPublicKeys . mu . Unlock ( )
// Range over all the MethodArgs, where each element represents a node to allow,
// and move the node from the notAcked map to the allowed map.
for _ , n := range message . MethodArgs {
key , ok := proc . centralAuth . pki . nodeNotAckedPublicKeys . KeyMap [ Node ( n ) ]
if ok {
func ( ) {
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
// Store/update the node and public key on the allowed pubKey map.
proc . centralAuth . pki . nodesAcked . keysAndHash . Keys [ Node ( n ) ] = key
} ( )
// Add key to persistent storage.
proc . centralAuth . pki . dbUpdatePublicKey ( string ( n ) , key )
// Delete the key from the NotAcked map
delete ( proc . centralAuth . pki . nodeNotAckedPublicKeys . KeyMap , Node ( n ) )
2022-05-24 05:21:48 +00:00
er := fmt . Errorf ( "info: REQKeysAllow : allowed new/updated public key for %v to allowed public key map" , n )
2022-05-19 18:54:33 +00:00
proc . errorKernel . infoSend ( proc , message , er )
}
}
// All new elements are now added, and we can create a new hash
// representing the current keys in the allowed map.
2022-06-01 13:11:23 +00:00
proc . centralAuth . updateHash ( proc , message )
2022-05-19 18:54:33 +00:00
2022-06-02 05:05:47 +00:00
// If new keys were allowed into the main map, we should send out one
// single update to all the registered nodes to inform of an update.
2024-03-27 11:48:17 +00:00
// If a node is not reachable at the time the update is sent it is
2022-06-02 05:05:47 +00:00
// not a problem since the nodes will periodically check for updates.
2022-05-31 09:05:21 +00:00
//
2022-06-02 05:05:47 +00:00
// If there are errors we will return from the function, and send no
// updates.
2022-06-02 07:36:09 +00:00
err := pushKeys ( proc , message , [ ] Node { } )
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
if err != nil {
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , err , logWarning )
2022-06-02 07:36:09 +00:00
return
}
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
}
} ( )
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
// pushKeys will try to push the the current keys and hashes to each node once.
// As input arguments it takes the current process, the current message, and a
// []Node in nodes.
// The nodes are used when a node or nodes have been deleted from the current
// nodesAcked map since it will contain the nodes that were deleted so we are
// also able to send an update to them as well.
func pushKeys ( proc process , message Message , nodes [ ] Node ) error {
2022-10-05 07:16:22 +00:00
er := fmt . Errorf ( "info: beginning of pushKeys, nodes=%v" , nodes )
2022-06-02 07:36:09 +00:00
var knh [ ] byte
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-02 07:36:09 +00:00
err := func ( ) error {
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
b , err := json . Marshal ( proc . centralAuth . pki . nodesAcked . keysAndHash )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodKeysAllow, failed to marshal keys map: %v" , err )
2022-06-02 07:36:09 +00:00
return er
}
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
copy ( knh , b )
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
return nil
} ( )
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
if err != nil {
return err
}
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
// proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
// defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
2022-06-02 05:05:47 +00:00
2022-06-02 07:36:09 +00:00
// For all nodes that is not ack'ed we try to send an update once.
for n := range proc . centralAuth . pki . nodeNotAckedPublicKeys . KeyMap {
2022-10-05 07:16:22 +00:00
er := fmt . Errorf ( "info: node to send REQKeysDeliverUpdate to:%v " , n )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-02 07:36:09 +00:00
msg := Message {
ToNode : n ,
2024-11-19 02:48:42 +00:00
Method : KeysDeliverUpdate ,
ReplyMethod : None ,
2023-01-05 00:55:52 +00:00
ACKTimeout : 0 ,
2022-06-02 07:36:09 +00:00
}
2022-06-02 05:05:47 +00:00
2024-12-03 15:17:33 +00:00
proc . newMessagesCh <- msg
2022-06-02 07:36:09 +00:00
2024-11-19 02:48:42 +00:00
er = fmt . Errorf ( "----> methodKeysAllow: SENDING KEYS TO NODE=%v" , message . FromNode )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-02 07:36:09 +00:00
}
// Create the data payload of the current allowed keys.
b , err := json . Marshal ( proc . centralAuth . pki . nodesAcked . keysAndHash )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodKeysAllow, failed to marshal keys map: %v" , err )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-06-02 07:36:09 +00:00
}
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
// Concatenate the current nodes in the keysAndHash map and the nodes
// we got from the function argument when this function was called.
nodeMap := make ( map [ Node ] struct { } )
for n := range proc . centralAuth . pki . nodesAcked . keysAndHash . Keys {
nodeMap [ n ] = struct { } { }
}
for _ , n := range nodes {
nodeMap [ n ] = struct { } { }
}
// For all nodes that is ack'ed we try to send an update once.
for n := range nodeMap {
2022-10-05 07:16:22 +00:00
er := fmt . Errorf ( "info: node to send REQKeysDeliverUpdate to:%v " , n )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-02 07:36:09 +00:00
msg := Message {
ToNode : n ,
2024-11-19 02:48:42 +00:00
Method : KeysDeliverUpdate ,
2022-06-02 07:36:09 +00:00
Data : b ,
2024-11-19 02:48:42 +00:00
ReplyMethod : None ,
2023-01-05 00:55:52 +00:00
ACKTimeout : 0 ,
2022-06-02 07:36:09 +00:00
}
2022-06-02 05:05:47 +00:00
2024-12-03 15:17:33 +00:00
proc . newMessagesCh <- msg
2022-06-02 05:05:47 +00:00
2024-11-19 02:48:42 +00:00
er = fmt . Errorf ( "----> methodKeysAllow: sending keys update to node=%v" , message . FromNode )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-02 07:36:09 +00:00
}
2022-05-31 07:28:04 +00:00
2022-06-02 07:36:09 +00:00
return nil
2022-05-19 18:54:33 +00:00
}
2022-06-01 05:29:25 +00:00
2024-11-19 02:48:42 +00:00
func methodKeysDelete ( proc process , message Message , node string ) ( [ ] byte , error ) {
inf := fmt . Errorf ( "<--- methodKeysDelete received from: %v, containing: %v" , message . FromNode , message . MethodArgs )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( inf )
2022-06-01 05:29:25 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
outCh := make ( chan [ ] byte )
errCh := make ( chan error )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 1 :
2024-11-19 02:48:42 +00:00
errCh <- fmt . Errorf ( "error: methodAclGroupNodesDeleteNode: got <1 number methodArgs, want >0" )
2022-06-01 05:29:25 +00:00
return
}
// HERE:
// We want to be able to define more nodes keys to delete.
// Loop over the methodArgs, and call the keyDelete function for each node,
// or rewrite the deleteKey to deleteKeys and it takes a []node as input
// so all keys can be deleted in 1 go, and we create 1 new hash, instead
// of doing it for each node delete.
2022-06-01 13:11:23 +00:00
proc . centralAuth . deletePublicKeys ( proc , message , message . MethodArgs )
2022-10-05 07:16:22 +00:00
er := fmt . Errorf ( "info: Deleted public keys: %v" , message . MethodArgs )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-01 05:29:25 +00:00
// All new elements are now added, and we can create a new hash
// representing the current keys in the allowed map.
2022-06-01 13:11:23 +00:00
proc . centralAuth . updateHash ( proc , message )
2022-10-05 07:16:22 +00:00
er = fmt . Errorf ( " * DEBUG updated hash for public keys" )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-06-01 05:29:25 +00:00
2022-06-02 07:36:09 +00:00
var nodes [ ] Node
for _ , n := range message . MethodArgs {
nodes = append ( nodes , Node ( n ) )
}
err := pushKeys ( proc , message , nodes )
if err != nil {
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , err , logWarning )
2022-06-02 07:36:09 +00:00
return
}
2022-06-01 05:29:25 +00:00
outString := fmt . Sprintf ( "deleted public keys for the nodes=%v\n" , message . MethodArgs )
out := [ ] byte ( outString )
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
}
} ( )
select {
case err := <- errCh :
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , err , logWarning )
2022-06-01 05:29:25 +00:00
case <- ctx . Done ( ) :
cancel ( )
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodAclGroupNodesDeleteNode: method timed out: %v" , message . MethodArgs )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-06-01 05:29:25 +00:00
case out := <- outCh :
2022-06-02 07:36:09 +00:00
2022-06-01 05:29:25 +00:00
newReplyMessage ( proc , message , out )
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}