1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

key updated from central are sent with jetstream

This commit is contained in:
postmannen 2025-01-02 08:33:45 +01:00
parent 714abc85cb
commit 7a292cda7a
3 changed files with 17 additions and 80 deletions

View file

@ -127,7 +127,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
c.pki.nodesAcked.mu.Unlock()
if ok && bytes.Equal(existingKey, msg.Data) {
er := fmt.Errorf("info: public key value for registered node %v is the same, doing nothing", msg.FromNode)
er := fmt.Errorf("info: received hello, where public key value for registered node %v is the same, doing nothing", msg.FromNode)
proc.errorKernel.logDebug(er)
return
}

View file

@ -169,6 +169,7 @@ func (s *server) jetstreamConsume() {
filterSubjectValues := []string{
fmt.Sprintf("NODES.%v", s.nodeName),
"NODES.all",
"NODES.keys",
}
// Check if there are more to consume defined in flags/env.

View file

@ -304,7 +304,7 @@ func methodKeysAllow(proc process, message Message, node string) ([]byte, error)
//
// If there are errors we will return from the function, and send no
// updates.
err := pushKeys(proc, message, []Node{})
err := pushKeys(proc, message)
if err != nil {
proc.errorKernel.errSend(proc, message, err, logWarning)
@ -324,89 +324,31 @@ func methodKeysAllow(proc process, message Message, node string) ([]byte, error)
// The nodes are used when a node or nodes have been deleted from the current
// nodesAcked map since it will contain the nodes that were deleted so we are
// also able to send an update to them as well.
func pushKeys(proc process, message Message, nodes []Node) error {
er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes)
var knh []byte
proc.errorKernel.logDebug(er)
err := func() error {
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
if err != nil {
er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err)
return er
}
copy(knh, b)
return nil
}()
if err != nil {
return err
}
// proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
// defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
// For all nodes that is not ack'ed we try to send an update once.
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
proc.errorKernel.logDebug(er)
msg := Message{
ToNode: n,
Method: KeysDeliverUpdate,
ReplyMethod: None,
ACKTimeout: 0,
}
proc.newMessagesCh <- msg
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
proc.errorKernel.logDebug(er)
}
func pushKeys(proc process, message Message) error {
// Create the data payload of the current allowed keys.
proc.centralAuth.pki.nodesAcked.mu.Lock()
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
proc.centralAuth.pki.nodesAcked.mu.Unlock()
if err != nil {
er := fmt.Errorf("error: methodKeysAllow, failed to marshal keys map: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
}
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
// Concatenate the current nodes in the keysAndHash map and the nodes
// we got from the function argument when this function was called.
nodeMap := make(map[Node]struct{})
for n := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
nodeMap[n] = struct{}{}
}
for _, n := range nodes {
nodeMap[n] = struct{}{}
msg := Message{
JetstreamToNode: "keys",
Method: KeysDeliverUpdate,
Data: b,
ReplyMethod: None,
ACKTimeout: 0,
}
// For all nodes that is ack'ed we try to send an update once.
for n := range nodeMap {
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
proc.errorKernel.logDebug(er)
msg := Message{
ToNode: n,
Method: KeysDeliverUpdate,
Data: b,
ReplyMethod: None,
ACKTimeout: 0,
}
proc.server.jetstreamPublishCh <- msg
proc.newMessagesCh <- msg
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
proc.errorKernel.logDebug(er)
}
er := fmt.Errorf("----> methodKeysAllow: sending keys update to all nodes")
proc.errorKernel.logDebug(er)
return nil
@ -452,13 +394,7 @@ func methodKeysDelete(proc process, message Message, node string) ([]byte, error
er = fmt.Errorf(" * DEBUG updated hash for public keys")
proc.errorKernel.logDebug(er)
var nodes []Node
for _, n := range message.MethodArgs {
nodes = append(nodes, Node(n))
}
err := pushKeys(proc, message, nodes)
err := pushKeys(proc, message)
if err != nil {
proc.errorKernel.errSend(proc, message, err, logWarning)