diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index 44405d8..ebc7a26 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -127,7 +127,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { c.pki.nodesAcked.mu.Unlock() if ok && bytes.Equal(existingKey, msg.Data) { - er := fmt.Errorf("info: public key value for registered node %v is the same, doing nothing", msg.FromNode) + er := fmt.Errorf("info: received hello, where public key value for registered node %v is the same, doing nothing", msg.FromNode) proc.errorKernel.logDebug(er) return } diff --git a/message_readers.go b/message_readers.go index c50c557..ad4d048 100644 --- a/message_readers.go +++ b/message_readers.go @@ -169,6 +169,7 @@ func (s *server) jetstreamConsume() { filterSubjectValues := []string{ fmt.Sprintf("NODES.%v", s.nodeName), "NODES.all", + "NODES.keys", } // Check if there are more to consume defined in flags/env. diff --git a/requests_keys.go b/requests_keys.go index 0fa0677..bf7addb 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -304,7 +304,7 @@ func methodKeysAllow(proc process, message Message, node string) ([]byte, error) // // If there are errors we will return from the function, and send no // updates. - err := pushKeys(proc, message, []Node{}) + err := pushKeys(proc, message) if err != nil { proc.errorKernel.errSend(proc, message, err, logWarning) @@ -324,89 +324,31 @@ func methodKeysAllow(proc process, message Message, node string) ([]byte, error) // The nodes are used when a node or nodes have been deleted from the current // nodesAcked map since it will contain the nodes that were deleted so we are // also able to send an update to them as well. -func pushKeys(proc process, message Message, nodes []Node) error { - er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes) - var knh []byte - proc.errorKernel.logDebug(er) - - err := func() error { - proc.centralAuth.pki.nodesAcked.mu.Lock() - defer proc.centralAuth.pki.nodesAcked.mu.Unlock() - - b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash) - if err != nil { - er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err) - return er - } - - copy(knh, b) - - return nil - }() - - if err != nil { - return err - } - - // proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock() - // defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock() - - // For all nodes that is not ack'ed we try to send an update once. - for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap { - er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) - proc.errorKernel.logDebug(er) - msg := Message{ - ToNode: n, - Method: KeysDeliverUpdate, - ReplyMethod: None, - ACKTimeout: 0, - } - - proc.newMessagesCh <- msg - - er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) - proc.errorKernel.logDebug(er) - } +func pushKeys(proc process, message Message) error { // Create the data payload of the current allowed keys. + proc.centralAuth.pki.nodesAcked.mu.Lock() b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash) + proc.centralAuth.pki.nodesAcked.mu.Unlock() if err != nil { er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err) proc.errorKernel.errSend(proc, message, er, logWarning) + return er } - proc.centralAuth.pki.nodesAcked.mu.Lock() - defer proc.centralAuth.pki.nodesAcked.mu.Unlock() - - // Concatenate the current nodes in the keysAndHash map and the nodes - // we got from the function argument when this function was called. - nodeMap := make(map[Node]struct{}) - - for n := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys { - nodeMap[n] = struct{}{} - } - for _, n := range nodes { - nodeMap[n] = struct{}{} + msg := Message{ + JetstreamToNode: "keys", + Method: KeysDeliverUpdate, + Data: b, + ReplyMethod: None, + ACKTimeout: 0, } - // For all nodes that is ack'ed we try to send an update once. - for n := range nodeMap { - er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) - proc.errorKernel.logDebug(er) - msg := Message{ - ToNode: n, - Method: KeysDeliverUpdate, - Data: b, - ReplyMethod: None, - ACKTimeout: 0, - } + proc.server.jetstreamPublishCh <- msg - proc.newMessagesCh <- msg - - er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode) - proc.errorKernel.logDebug(er) - } + er := fmt.Errorf("----> methodKeysAllow: sending keys update to all nodes") + proc.errorKernel.logDebug(er) return nil @@ -452,13 +394,7 @@ func methodKeysDelete(proc process, message Message, node string) ([]byte, error er = fmt.Errorf(" * DEBUG updated hash for public keys") proc.errorKernel.logDebug(er) - var nodes []Node - - for _, n := range message.MethodArgs { - nodes = append(nodes, Node(n)) - } - - err := pushKeys(proc, message, nodes) + err := pushKeys(proc, message) if err != nil { proc.errorKernel.errSend(proc, message, err, logWarning)