mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-20 22:52:13 +00:00
fixed debug logging
This commit is contained in:
parent
9c15e6dbfd
commit
0249a86040
5 changed files with 52 additions and 32 deletions
|
@ -119,7 +119,8 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||||
c.pki.nodesAcked.mu.Unlock()
|
c.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
if ok && bytes.Equal(existingKey, msg.Data) {
|
if ok && bytes.Equal(existingKey, msg.Data) {
|
||||||
fmt.Printf(" \n * public key value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode)
|
er := fmt.Errorf("info: public key value for REGISTERED node %v is the same, doing nothing", msg.FromNode)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,10 +156,13 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes)
|
err := c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes)
|
||||||
|
if err != nil {
|
||||||
|
proc.errorKernel.errSend(proc, msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||||
fmt.Printf(" * %v\n", er)
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
c.pki.errorKernel.infoSend(proc, msg, er)
|
c.pki.errorKernel.infoSend(proc, msg, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +192,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string
|
||||||
// return value, err
|
// return value, err
|
||||||
// }
|
// }
|
||||||
|
|
||||||
//dbUpdatePublicKey will update the public key for a node in the db.
|
// dbUpdatePublicKey will update the public key for a node in the db.
|
||||||
func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
|
func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
|
||||||
err := p.db.Update(func(tx *bolt.Tx) error {
|
err := p.db.Update(func(tx *bolt.Tx) error {
|
||||||
//Create a bucket
|
//Create a bucket
|
||||||
|
@ -218,7 +222,9 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error {
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
err := bu.Delete([]byte(n))
|
err := bu.Delete([]byte(n))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err)
|
er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +234,7 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//dbUpdateHash will update the public key for a node in the db.
|
// dbUpdateHash will update the public key for a node in the db.
|
||||||
func (p *pki) dbUpdateHash(hash []byte) error {
|
func (p *pki) dbUpdateHash(hash []byte) error {
|
||||||
err := p.db.Update(func(tx *bolt.Tx) error {
|
err := p.db.Update(func(tx *bolt.Tx) error {
|
||||||
//Create a bucket
|
//Create a bucket
|
||||||
|
|
|
@ -253,7 +253,8 @@ func (p process) spawnWorker() {
|
||||||
p.processes.active.procNames[p.processName] = p
|
p.processes.active.procNames[p.processName] = p
|
||||||
p.processes.active.mu.Unlock()
|
p.processes.active.mu.Unlock()
|
||||||
|
|
||||||
log.Printf("Successfully started process: %v\n", p.processName)
|
er := fmt.Errorf("Successfully started process: %v\n", p.processName)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -720,7 +720,8 @@ func (p *processes) printProcessesMap() {
|
||||||
p.active.mu.Lock()
|
p.active.mu.Lock()
|
||||||
|
|
||||||
for pName, proc := range p.active.procNames {
|
for pName, proc := range p.active.procNames {
|
||||||
log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name())
|
er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name())
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/fxamacker/cbor/v2"
|
"github.com/fxamacker/cbor/v2"
|
||||||
)
|
)
|
||||||
|
@ -53,19 +52,23 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s
|
||||||
proc.centralAuth.accessLists.schemaGenerated.mu.Lock()
|
proc.centralAuth.accessLists.schemaGenerated.mu.Lock()
|
||||||
defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock()
|
defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock()
|
||||||
|
|
||||||
log.Printf(" ---- subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v\n", message.FromNode, message.Data)
|
er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
// Check if the received hash is the same as the one currently active,
|
// Check if the received hash is the same as the one currently active,
|
||||||
// If it is the same we exit the handler immediately.
|
// If it is the same we exit the handler immediately.
|
||||||
hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash
|
hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash
|
||||||
hash := hash32[:]
|
hash := hash32[:]
|
||||||
log.Printf("---- subscriber methodREQAclRequestUpdate: the central acl hash=%v\n", hash32)
|
er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: the central acl hash=%v", hash32)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
if bytes.Equal(hash, message.Data) {
|
if bytes.Equal(hash, message.Data) {
|
||||||
log.Printf("---- subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER\n")
|
er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER")
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("---- subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl\n")
|
er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl")
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
// Generate JSON for Message.Data
|
// Generate JSON for Message.Data
|
||||||
|
|
||||||
|
@ -79,10 +82,10 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: REQAclRequestUpdate : json marshal failed: %v, message: %v", err, message)
|
er := fmt.Errorf("error: REQAclRequestUpdate : json marshal failed: %v, message: %v", err, message)
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
log.Fatalf("%v\n", er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf(" ----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v\n", message.FromNode, hdh)
|
er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
newReplyMessage(proc, message, js)
|
newReplyMessage(proc, message, js)
|
||||||
}()
|
}()
|
||||||
|
@ -143,7 +146,6 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
log.Fatalf("\n * DEBUG: ER: %v\n", er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mapOfFromNodeCommands := make(map[Node]map[command]struct{})
|
mapOfFromNodeCommands := make(map[Node]map[command]struct{})
|
||||||
|
@ -153,8 +155,6 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : cbor unmarshal failed: %v, message: %v", err, message)
|
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : cbor unmarshal failed: %v, message: %v", err, message)
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
log.Fatalf("\n * DEBUG: ER: %v\n", er)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ---
|
// ---
|
||||||
|
@ -102,17 +101,21 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
||||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
fmt.Printf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data)
|
er := fmt.Errorf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
// Check if the received hash is the same as the one currently active,
|
// Check if the received hash is the same as the one currently active,
|
||||||
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
||||||
fmt.Printf("\n --- methodREQKeysRequestUpdate: NODE AND CENTRAL HAVE EQUAL KEYS, NOTHING TO DO, EXITING HANDLER\n\n")
|
er := fmt.Errorf("info: methodREQKeysRequestUpdate: NODE AND CENTRAL HAVE EQUAL KEYS, NOTHING TO DO, EXITING HANDLER")
|
||||||
|
proc.errorKernel.infoSend(proc, message, er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("\n ------------methodREQKeysRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL KEYS, PREPARING TO SEND NEW VERSION OF KEYS\n\n")
|
er = fmt.Errorf("info: methodREQKeysRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL KEYS, PREPARING TO SEND NEW VERSION OF KEYS")
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
fmt.Printf(" * methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v\n\n", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
er = fmt.Errorf("info: methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
||||||
|
|
||||||
|
@ -120,7 +123,8 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
||||||
er := fmt.Errorf("error: methodREQKeysRequestUpdate, failed to marshal keys map: %v", err)
|
er := fmt.Errorf("error: methodREQKeysRequestUpdate, failed to marshal keys map: %v", err)
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
}
|
}
|
||||||
fmt.Printf("\n ----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v\n", message.FromNode)
|
er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
newReplyMessage(proc, message, b)
|
newReplyMessage(proc, message, b)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -181,7 +185,8 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", keysAndHash)
|
er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
// If the received map was empty we also want to delete all the locally stored keys,
|
// If the received map was empty we also want to delete all the locally stored keys,
|
||||||
// else we copy the marshaled keysAndHash we received from central into our map.
|
// else we copy the marshaled keysAndHash we received from central into our map.
|
||||||
|
@ -316,8 +321,9 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
|
||||||
// nodesAcked map since it will contain the nodes that were deleted so we are
|
// nodesAcked map since it will contain the nodes that were deleted so we are
|
||||||
// also able to send an update to them as well.
|
// also able to send an update to them as well.
|
||||||
func pushKeys(proc process, message Message, nodes []Node) error {
|
func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
fmt.Printf(" \n\n\n ************** beginning of pushKeys, nodes=%v\n", nodes)
|
er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes)
|
||||||
var knh []byte
|
var knh []byte
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
err := func() error {
|
err := func() error {
|
||||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
|
@ -343,7 +349,8 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
// For all nodes that is not ack'ed we try to send an update once.
|
// For all nodes that is not ack'ed we try to send an update once.
|
||||||
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
||||||
fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n)
|
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
msg := Message{
|
msg := Message{
|
||||||
ToNode: n,
|
ToNode: n,
|
||||||
Method: REQKeysDeliverUpdate,
|
Method: REQKeysDeliverUpdate,
|
||||||
|
@ -359,7 +366,8 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
fmt.Printf("\n ----> methodREQKeysAllow: SENDING KEYS TO NODE=%v\n", message.FromNode)
|
er = fmt.Errorf("----> methodREQKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the data payload of the current allowed keys.
|
// Create the data payload of the current allowed keys.
|
||||||
|
@ -386,7 +394,8 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
// For all nodes that is ack'ed we try to send an update once.
|
// For all nodes that is ack'ed we try to send an update once.
|
||||||
for n := range nodeMap {
|
for n := range nodeMap {
|
||||||
fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n)
|
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
msg := Message{
|
msg := Message{
|
||||||
ToNode: n,
|
ToNode: n,
|
||||||
Method: REQKeysDeliverUpdate,
|
Method: REQKeysDeliverUpdate,
|
||||||
|
@ -403,7 +412,8 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
log.Printf("\n ----> methodREQKeysAllow: sending keys update to node=%v\n", message.FromNode)
|
er = fmt.Errorf("----> methodREQKeysAllow: sending keys update to node=%v", message.FromNode)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -449,12 +459,14 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
|
||||||
// of doing it for each node delete.
|
// of doing it for each node delete.
|
||||||
|
|
||||||
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
||||||
log.Printf(" * DEBUG Deleted public keys: %v\n", message.MethodArgs)
|
er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
// All new elements are now added, and we can create a new hash
|
// All new elements are now added, and we can create a new hash
|
||||||
// representing the current keys in the allowed map.
|
// representing the current keys in the allowed map.
|
||||||
proc.centralAuth.updateHash(proc, message)
|
proc.centralAuth.updateHash(proc, message)
|
||||||
log.Printf(" * DEBUG updated hash for public keys\n")
|
er = fmt.Errorf(" * DEBUG updated hash for public keys")
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
|
||||||
var nodes []Node
|
var nodes []Node
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue