diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index 13e4338..8763965 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -119,7 +119,8 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { c.pki.nodesAcked.mu.Unlock() if ok && bytes.Equal(existingKey, msg.Data) { - fmt.Printf(" \n * public key value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode) + er := fmt.Errorf("info: public key value for REGISTERED node %v is the same, doing nothing", msg.FromNode) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) return } @@ -155,10 +156,13 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string } }() - c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes) + err := c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes) + if err != nil { + proc.errorKernel.errSend(proc, msg, err) + } er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) - fmt.Printf(" * %v\n", er) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) c.pki.errorKernel.infoSend(proc, msg, er) } @@ -188,7 +192,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string // return value, err // } -//dbUpdatePublicKey will update the public key for a node in the db. +// dbUpdatePublicKey will update the public key for a node in the db. func (p *pki) dbUpdatePublicKey(node string, value []byte) error { err := p.db.Update(func(tx *bolt.Tx) error { //Create a bucket @@ -218,7 +222,9 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error { for _, n := range nodes { err := bu.Delete([]byte(n)) if err != nil { - log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err) + er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + return er } } @@ -228,7 +234,7 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error { return err } -//dbUpdateHash will update the public key for a node in the db. +// dbUpdateHash will update the public key for a node in the db. func (p *pki) dbUpdateHash(hash []byte) error { err := p.db.Update(func(tx *bolt.Tx) error { //Create a bucket diff --git a/process.go b/process.go index 1a1d2e2..669229d 100644 --- a/process.go +++ b/process.go @@ -253,7 +253,8 @@ func (p process) spawnWorker() { p.processes.active.procNames[p.processName] = p p.processes.active.mu.Unlock() - log.Printf("Successfully started process: %v\n", p.processName) + er := fmt.Errorf("Successfully started process: %v\n", p.processName) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } var ( diff --git a/processes.go b/processes.go index b6a073c..1076fec 100644 --- a/processes.go +++ b/processes.go @@ -720,7 +720,8 @@ func (p *processes) printProcessesMap() { p.active.mu.Lock() for pName, proc := range p.active.procNames { - log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name()) + er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name()) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) diff --git a/requests_acl.go b/requests_acl.go index 019b74e..2571f20 100644 --- a/requests_acl.go +++ b/requests_acl.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "log" "github.com/fxamacker/cbor/v2" ) @@ -53,19 +52,23 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s proc.centralAuth.accessLists.schemaGenerated.mu.Lock() defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock() - log.Printf(" ---- subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v\n", message.FromNode, message.Data) + er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v", message.FromNode, message.Data) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // Check if the received hash is the same as the one currently active, // If it is the same we exit the handler immediately. hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash hash := hash32[:] - log.Printf("---- subscriber methodREQAclRequestUpdate: the central acl hash=%v\n", hash32) + er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: the central acl hash=%v", hash32) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) if bytes.Equal(hash, message.Data) { - log.Printf("---- subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER\n") + er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER") + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) return } - log.Printf("---- subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl\n") + er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl") + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // Generate JSON for Message.Data @@ -79,10 +82,10 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s if err != nil { er := fmt.Errorf("error: REQAclRequestUpdate : json marshal failed: %v, message: %v", err, message) proc.errorKernel.errSend(proc, message, er) - log.Fatalf("%v\n", er) } - fmt.Printf(" ----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v\n", message.FromNode, hdh) + er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) newReplyMessage(proc, message, js) }() @@ -143,7 +146,6 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s if err != nil { er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : json unmarshal failed: %v, message: %v", err, message) proc.errorKernel.errSend(proc, message, er) - log.Fatalf("\n * DEBUG: ER: %v\n", er) } mapOfFromNodeCommands := make(map[Node]map[command]struct{}) @@ -153,8 +155,6 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s if err != nil { er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : cbor unmarshal failed: %v, message: %v", err, message) proc.errorKernel.errSend(proc, message, er) - log.Fatalf("\n * DEBUG: ER: %v\n", er) - } } diff --git a/requests_keys.go b/requests_keys.go index ce67cc4..fd82ab8 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "log" ) // --- @@ -102,17 +101,21 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node proc.centralAuth.pki.nodesAcked.mu.Lock() defer proc.centralAuth.pki.nodesAcked.mu.Unlock() - fmt.Printf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data) + er := fmt.Errorf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // Check if the received hash is the same as the one currently active, if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) { - fmt.Printf("\n --- methodREQKeysRequestUpdate: NODE AND CENTRAL HAVE EQUAL KEYS, NOTHING TO DO, EXITING HANDLER\n\n") + er := fmt.Errorf("info: methodREQKeysRequestUpdate: NODE AND CENTRAL HAVE EQUAL KEYS, NOTHING TO DO, EXITING HANDLER") + proc.errorKernel.infoSend(proc, message, er) return } - fmt.Printf("\n ------------methodREQKeysRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL KEYS, PREPARING TO SEND NEW VERSION OF KEYS\n\n") + er = fmt.Errorf("info: methodREQKeysRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL KEYS, PREPARING TO SEND NEW VERSION OF KEYS") + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) - fmt.Printf(" * methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v\n\n", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash) + er = fmt.Errorf("info: methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash) @@ -120,7 +123,8 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node er := fmt.Errorf("error: methodREQKeysRequestUpdate, failed to marshal keys map: %v", err) proc.errorKernel.errSend(proc, message, er) } - fmt.Printf("\n ----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v\n", message.FromNode) + er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) newReplyMessage(proc, message, b) }() } @@ -181,7 +185,8 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node proc.errorKernel.errSend(proc, message, er) } - fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", keysAndHash) + er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // If the received map was empty we also want to delete all the locally stored keys, // else we copy the marshaled keysAndHash we received from central into our map. @@ -316,8 +321,9 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string) // nodesAcked map since it will contain the nodes that were deleted so we are // also able to send an update to them as well. func pushKeys(proc process, message Message, nodes []Node) error { - fmt.Printf(" \n\n\n ************** beginning of pushKeys, nodes=%v\n", nodes) + er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes) var knh []byte + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) err := func() error { proc.centralAuth.pki.nodesAcked.mu.Lock() @@ -343,7 +349,8 @@ func pushKeys(proc process, message Message, nodes []Node) error { // For all nodes that is not ack'ed we try to send an update once. for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap { - fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n) + er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) msg := Message{ ToNode: n, Method: REQKeysDeliverUpdate, @@ -359,7 +366,8 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.toRingbufferCh <- []subjectAndMessage{sam} - fmt.Printf("\n ----> methodREQKeysAllow: SENDING KEYS TO NODE=%v\n", message.FromNode) + er = fmt.Errorf("----> methodREQKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Create the data payload of the current allowed keys. @@ -386,7 +394,8 @@ func pushKeys(proc process, message Message, nodes []Node) error { // For all nodes that is ack'ed we try to send an update once. for n := range nodeMap { - fmt.Printf("\n\n\n ************** DEBUG: node to send REQKeysDeliverUpdate to:%v\n ", n) + er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) msg := Message{ ToNode: n, Method: REQKeysDeliverUpdate, @@ -403,7 +412,8 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.toRingbufferCh <- []subjectAndMessage{sam} - log.Printf("\n ----> methodREQKeysAllow: sending keys update to node=%v\n", message.FromNode) + er = fmt.Errorf("----> methodREQKeysAllow: sending keys update to node=%v", message.FromNode) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } return nil @@ -449,12 +459,14 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string) // of doing it for each node delete. proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs) - log.Printf(" * DEBUG Deleted public keys: %v\n", message.MethodArgs) + er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // All new elements are now added, and we can create a new hash // representing the current keys in the allowed map. proc.centralAuth.updateHash(proc, message) - log.Printf(" * DEBUG updated hash for public keys\n") + er = fmt.Errorf(" * DEBUG updated hash for public keys") + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) var nodes []Node