mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
debug log with slog format
This commit is contained in:
parent
44027c66c6
commit
060d8b135a
16 changed files with 123 additions and 255 deletions
2
TODO.md
2
TODO.md
|
@ -15,3 +15,5 @@ bool flags with default value set to "false" becomes "true" if false is set.
|
||||||
Remove these error logs:
|
Remove these error logs:
|
||||||
|
|
||||||
`level=WARN msg="Thu Jan 9 12:14:24 2025, node: btdev1, error: readFolder: failed to open readFile from readFolder: open readfolder/msg2.yaml: no such file or directory\n"`
|
`level=WARN msg="Thu Jan 9 12:14:24 2025, node: btdev1, error: readFolder: failed to open readFile from readFolder: open readfolder/msg2.yaml: no such file or directory\n"`
|
||||||
|
|
||||||
|
Remove httpGetScheduled
|
||||||
|
|
|
@ -322,8 +322,7 @@ func (c *centralAuth) generateACLsForAllNodes() error {
|
||||||
ap.parse()
|
ap.parse()
|
||||||
}
|
}
|
||||||
|
|
||||||
inf := fmt.Errorf("generateACLsFor all nodes, ACLsToConvert contains: %#v", c.accessLists.schemaGenerated.ACLsToConvert)
|
c.accessLists.errorKernel.logDebug("generateACLsFor all nodes", "ACLsToConvert", c.accessLists.schemaGenerated.ACLsToConvert)
|
||||||
c.accessLists.errorKernel.logDebug(inf)
|
|
||||||
|
|
||||||
// ACLsToConvert got the complete picture of what ACL's that
|
// ACLsToConvert got the complete picture of what ACL's that
|
||||||
// are defined for each individual host node.
|
// are defined for each individual host node.
|
||||||
|
@ -373,8 +372,7 @@ func (c *centralAuth) generateACLsForAllNodes() error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
inf = fmt.Errorf("generateACLsFor all nodes, GeneratedACLsMap contains: %#v", c.accessLists.schemaGenerated.GeneratedACLsMap)
|
c.accessLists.errorKernel.logDebug("generateACLsForAllNodes:", "GeneratedACLsMap", c.accessLists.schemaGenerated.GeneratedACLsMap)
|
||||||
c.accessLists.errorKernel.logDebug(inf)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,8 +80,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
||||||
// Open the database file for persistent storage of public keys.
|
// Open the database file for persistent storage of public keys.
|
||||||
db, err := bolt.Open(databaseFilepath, 0660, nil)
|
db, err := bolt.Open(databaseFilepath, 0660, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("newPKI: error: failed to open db: %v", err)
|
errorKernel.logDebug("newPKI: error: failed to open db", "error", err)
|
||||||
errorKernel.logDebug(er)
|
|
||||||
return &p
|
return &p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,16 +89,14 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
||||||
// Get public keys from db storage.
|
// Get public keys from db storage.
|
||||||
keys, err := p.dbDumpPublicKey()
|
keys, err := p.dbDumpPublicKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("newPKI: dbPublicKeyDump failed, probably empty db: %v", err)
|
errorKernel.logDebug("newPKI: dbPublicKeyDump failed, probably empty db", "error", err)
|
||||||
errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only assign from storage to in memory map if the storage contained any values.
|
// Only assign from storage to in memory map if the storage contained any values.
|
||||||
if keys != nil {
|
if keys != nil {
|
||||||
p.nodesAcked.keysAndHash.Keys = keys
|
p.nodesAcked.keysAndHash.Keys = keys
|
||||||
for k, v := range keys {
|
for k, v := range keys {
|
||||||
er := fmt.Errorf("newPKI: public keys db contains: %v, %v", k, []byte(v))
|
errorKernel.logDebug("newPKI: public keys db contains", "key", k, "value", []byte(v))
|
||||||
errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,8 +124,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||||
c.pki.nodesAcked.mu.Unlock()
|
c.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
if ok && bytes.Equal(existingKey, msg.Data) {
|
if ok && bytes.Equal(existingKey, msg.Data) {
|
||||||
er := fmt.Errorf("info: public key value for registered node %v is the same, doing nothing", msg.FromNode)
|
proc.errorKernel.logDebug("addPublicKey: public key value for registered node is the same, doing nothing", "node", msg.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,36 +162,9 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
c.pki.errorKernel.infoSend(proc, msg, er)
|
c.pki.errorKernel.infoSend(proc, msg, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// // dbGetPublicKey will look up and return a specific value if it exists for a key in a bucket in a DB.
|
|
||||||
// func (c *centralAuth) dbGetPublicKey(node string) ([]byte, error) {
|
|
||||||
// var value []byte
|
|
||||||
// // View is a help function to get values out of the database.
|
|
||||||
// err := c.db.View(func(tx *bolt.Tx) error {
|
|
||||||
// //Open a bucket to get key's and values from.
|
|
||||||
// bu := tx.Bucket([]byte(c.bucketNamePublicKeys))
|
|
||||||
// if bu == nil {
|
|
||||||
// log.Printf("info: no db bucket exist: %v\n", c.bucketNamePublicKeys)
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// v := bu.Get([]byte(node))
|
|
||||||
// if len(v) == 0 {
|
|
||||||
// log.Printf("info: view: key not found\n")
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// value = v
|
|
||||||
//
|
|
||||||
// return nil
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// return value, err
|
|
||||||
// }
|
|
||||||
|
|
||||||
// dbUpdatePublicKey will update the public key for a node in the db.
|
// dbUpdatePublicKey will update the public key for a node in the db.
|
||||||
func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
|
func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
|
||||||
err := p.db.Update(func(tx *bolt.Tx) error {
|
err := p.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
@ -227,7 +196,6 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error {
|
||||||
err := bu.Delete([]byte(n))
|
err := bu.Delete([]byte(n))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err)
|
er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -262,7 +262,7 @@ func (e *errorKernel) errSend(proc process, msg Message, err error, logLevel log
|
||||||
case logWarning:
|
case logWarning:
|
||||||
e.logWarn("warn", err)
|
e.logWarn("warn", err)
|
||||||
case logDebug:
|
case logDebug:
|
||||||
e.logDebug(err)
|
e.logDebug("debug", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,10 +294,8 @@ func (e *errorKernel) logWarn(msg string, args ...any) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Make this into structured logging
|
// TODO: Make this into structured logging
|
||||||
func (e *errorKernel) logDebug(err error) {
|
func (e *errorKernel) logDebug(msg string, args ...any) {
|
||||||
if e.configuration.LogLevel == string(logDebug) {
|
slog.Debug(msg, args...)
|
||||||
slog.Debug("debug", err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// errorAction is used to tell the process who sent the error
|
// errorAction is used to tell the process who sent the error
|
||||||
|
|
|
@ -339,8 +339,7 @@ func (s *server) readFolder() {
|
||||||
|
|
||||||
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
|
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op)
|
s.errorKernel.logDebug("readFolder: got file event", "name", event.Name, "op", event.Op)
|
||||||
s.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
fh, err := os.Open(event.Name)
|
fh, err := os.Open(event.Name)
|
||||||
|
@ -384,16 +383,14 @@ func (s *server) readFolder() {
|
||||||
if messages[i].JetstreamToNode != "" {
|
if messages[i].JetstreamToNode != "" {
|
||||||
|
|
||||||
s.jetstreamPublishCh <- messages[i]
|
s.jetstreamPublishCh <- messages[i]
|
||||||
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", messages)
|
s.errorKernel.logDebug("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh", "messages", messages)
|
||||||
s.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
s.newMessagesCh <- messages[i]
|
s.newMessagesCh <- messages[i]
|
||||||
|
|
||||||
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", messages)
|
s.errorKernel.logDebug("readFolder: read new message in readfolder and putting it on s.samToSendCh", "messages", messages)
|
||||||
s.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
|
|
|
@ -108,8 +108,7 @@ func (n *nodeAcl) loadFromFile() error {
|
||||||
if _, err := os.Stat(n.filePath); os.IsNotExist(err) {
|
if _, err := os.Stat(n.filePath); os.IsNotExist(err) {
|
||||||
// Just logging the error since it is not crucial that a key file is missing,
|
// Just logging the error since it is not crucial that a key file is missing,
|
||||||
// since a new one will be created on the next update.
|
// since a new one will be created on the next update.
|
||||||
er := fmt.Errorf("acl: loadFromFile: no acl file found at %v", n.filePath)
|
n.errorKernel.logDebug("nodeAcl:loadFromFile: no acl file found", "file", n.filePath)
|
||||||
n.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,8 +130,7 @@ func (n *nodeAcl) loadFromFile() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("nodeAcl: loadFromFile: Loaded existing acl's from file: %v", n.aclAndHash.Hash)
|
n.errorKernel.logDebug("nodeAcl: loadFromFile: Loaded existing acl's from file", "hash", n.aclAndHash.Hash)
|
||||||
n.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -234,8 +232,7 @@ func (p *publicKeys) loadFromFile() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("nodeAuth: loadFromFile: Loaded existing keys from file: %v", p.keysAndHash.Hash)
|
p.errorKernel.logDebug("nodeAuth: loadFromFile: Loaded existing keys from file", "hash", p.keysAndHash.Hash)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
97
process.go
97
process.go
|
@ -164,8 +164,7 @@ func (p process) start() {
|
||||||
p.processes.active.procNames[p.processName] = p
|
p.processes.active.procNames[p.processName] = p
|
||||||
p.processes.active.mu.Unlock()
|
p.processes.active.mu.Unlock()
|
||||||
|
|
||||||
er := fmt.Errorf("successfully started process: %v", p.processName)
|
p.errorKernel.logDebug("successfully started process", "processName", p.processName)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p process) startSubscriber() {
|
func (p process) startSubscriber() {
|
||||||
|
@ -202,14 +201,15 @@ func (p process) startSubscriber() {
|
||||||
delete(p.processes.active.procNames, p.processName)
|
delete(p.processes.active.procNames, p.processName)
|
||||||
p.processes.active.mu.Unlock()
|
p.processes.active.mu.Unlock()
|
||||||
|
|
||||||
er := fmt.Errorf("successfully stopped process: %v", p.processName)
|
p.errorKernel.logDebug("successfully stopped process", "processName", p.processName)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrACKSubscribeRetry = errors.New("ctrl: retrying to subscribe for ack message")
|
ErrACKSubscribeRetry = errors.New("ctrl: retrying to subscribe for ack message")
|
||||||
|
// TODO: Other errors are not used for anything other than printing.
|
||||||
|
ErrOther = errors.New("other error")
|
||||||
)
|
)
|
||||||
|
|
||||||
// publishNats will create the Nats message with headers and payload.
|
// publishNats will create the Nats message with headers and payload.
|
||||||
|
@ -239,8 +239,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
Header: natsMsgHeader,
|
Header: natsMsgHeader,
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: preparing to send nats message with subject %v, id: %v", msg.Subject, message.ID)
|
p.errorKernel.logDebug("publishNats: preparing to send nats message", "subject", msg.Subject, "id", message.ID)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -251,8 +250,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
err = func() error {
|
err = func() error {
|
||||||
err := natsConn.PublishMsg(msg)
|
err := natsConn.PublishMsg(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats publish for message with subject failed: %v", err)
|
p.errorKernel.logDebug("publishNats: nats publish for message with subject failed", "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
return ErrACKSubscribeRetry
|
return ErrACKSubscribeRetry
|
||||||
}
|
}
|
||||||
p.metrics.promNatsDeliveredTotal.Inc()
|
p.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
@ -286,8 +284,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode)
|
p.errorKernel.logDebug("publishNats: ", "retryAttempts", retryAttempts, "retries", message.Retries, "ACKTimeout", message.ACKTimeout, "ID", message.ID, "method", message.Method, "toNode", message.ToNode)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// The SubscribeSync used in the subscriber, will get messages that
|
// The SubscribeSync used in the subscriber, will get messages that
|
||||||
// are sent after it started subscribing.
|
// are sent after it started subscribing.
|
||||||
|
@ -297,15 +294,13 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
defer func() {
|
defer func() {
|
||||||
err := subReply.Unsubscribe()
|
err := subReply.Unsubscribe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v", err)
|
p.errorKernel.logDebug("nats SubscribeSync: failed when unsubscribing for ACK", "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
er = fmt.Errorf("%v, waiting equal to RetryWait %ds before retrying", er, message.RetryWait)
|
p.errorKernel.logDebug("publishNats: waiting equal to RetryWait before retrying", "error", er, "RetryWait", message.RetryWait)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
|
@ -315,9 +310,8 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
// Publish message
|
// Publish message
|
||||||
err = natsConn.PublishMsg(msg)
|
err = natsConn.PublishMsg(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats publish failed: %v, waiting equal to RetryWait of %ds before retrying", err, message.RetryWait)
|
p.errorKernel.logDebug("publishNats: nats publish failed, waiting equal to RetryWait before retrying", "error", err, "RetryWait", message.RetryWait)
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
return ErrACKSubscribeRetry
|
return ErrACKSubscribeRetry
|
||||||
|
@ -334,8 +328,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
||||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, subject.name(), err)
|
p.errorKernel.logDebug("publishNats: ack receive failed: waiting before retrying", "seconds", message.RetryWait, "subject", subject.name(), "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
|
@ -343,16 +336,14 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
return ErrACKSubscribeRetry
|
return ErrACKSubscribeRetry
|
||||||
|
|
||||||
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
||||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", subject.name(), err)
|
p.errorKernel.logDebug("publishNats: ack receive failed: conneciton closed or bad subscription, will not retry message", "subject", subject.name(), "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return er
|
return ErrOther
|
||||||
|
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", subject.name(), err)
|
p.errorKernel.logDebug("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type", "subject", subject.name(), "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return er
|
return ErrOther
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -374,8 +365,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
|
||||||
// Message were delivered successfully.
|
// Message were delivered successfully.
|
||||||
p.metrics.promNatsDeliveredTotal.Inc()
|
p.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
|
||||||
er = fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID)
|
p.errorKernel.logDebug("publishNats: sent message", "subject", msg.Subject, "ID", message.ID)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -402,8 +392,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
||||||
|
|
||||||
// If debugging is enabled, print the source node name of the nats messages received.
|
// If debugging is enabled, print the source node name of the nats messages received.
|
||||||
if val, ok := msg.Header["fromNode"]; ok {
|
if val, ok := msg.Header["fromNode"]; ok {
|
||||||
er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject)
|
p.errorKernel.logDebug("nats message received from", "node", val, "subject", subject)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message, err := p.server.messageDeserializeAndUncompress(msgData)
|
message, err := p.server.messageDeserializeAndUncompress(msgData)
|
||||||
|
@ -431,8 +420,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
||||||
|
|
||||||
// Check for ACK type Event.
|
// Check for ACK type Event.
|
||||||
case message.ACKTimeout >= 1:
|
case message.ACKTimeout >= 1:
|
||||||
er := fmt.Errorf("subscriberHandler: received ACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID)
|
p.errorKernel.logDebug("messageSubscriberHandler: received ACK message from", "method", message.Method, "fromNode", message.FromNode, "ID", message.ID)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
// When spawning sub processes we can directly assign handlers to the process upon
|
// When spawning sub processes we can directly assign handlers to the process upon
|
||||||
// creation. We here check if a handler is already assigned, and if it is nil, we
|
// creation. We here check if a handler is already assigned, and if it is nil, we
|
||||||
// lookup and find the correct handler to use if available.
|
// lookup and find the correct handler to use if available.
|
||||||
|
@ -456,8 +444,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
||||||
natsConn.Publish(msg.Reply, []byte{})
|
natsConn.Publish(msg.Reply, []byte{})
|
||||||
|
|
||||||
case message.ACKTimeout < 1:
|
case message.ACKTimeout < 1:
|
||||||
er := fmt.Errorf("subscriberHandler: received NACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID)
|
p.errorKernel.logDebug("messageSubscriberHandler: received NACK message from", ",method", message.Method, "fromNode", message.FromNode, "ID", message.ID)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
// When spawning sub processes we can directly assign handlers to the process upon
|
// When spawning sub processes we can directly assign handlers to the process upon
|
||||||
// creation. We here check if a handler is already assigned, and if it is nil, we
|
// creation. We here check if a handler is already assigned, and if it is nil, we
|
||||||
// lookup and find the correct handler to use if available.
|
// lookup and find the correct handler to use if available.
|
||||||
|
@ -490,16 +477,13 @@ func (p process) callHandler(message Message, thisNode string) {
|
||||||
// Call the handler if ACL/signature checking returns true.
|
// Call the handler if ACL/signature checking returns true.
|
||||||
go func() {
|
go func() {
|
||||||
conf := p.nodeAuth.configuration
|
conf := p.nodeAuth.configuration
|
||||||
var er error
|
|
||||||
|
|
||||||
er = fmt.Errorf("callhandler: got message from: %v, method: %v, EnableSignatureCheck=%v, EnableAclCheck=%v", message.FromNode, message.Method, conf.EnableSignatureCheck, conf.EnableAclCheck)
|
p.errorKernel.logDebug("callhandler: checking how to call message", "fromNode", message.FromNode, "method", message.Method, "EnableSignatureCheck", conf.EnableSignatureCheck, "EnableAclCheck", conf.EnableAclCheck)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
// If no checking enabled we should just allow the message.
|
// If no checking enabled we should just allow the message.
|
||||||
case !conf.EnableSignatureCheck && !conf.EnableAclCheck:
|
case !conf.EnableSignatureCheck && !conf.EnableAclCheck:
|
||||||
er := fmt.Errorf("NO CHECK OF SIG OR ACL FLAG ENABLED, EXECUTING HANDLER: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: NO CHECK OF SIG OR ACL FLAG ENABLED, EXECUTING HANDLER", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
executeHandler(p, message, thisNode)
|
executeHandler(p, message, thisNode)
|
||||||
return
|
return
|
||||||
|
@ -507,44 +491,35 @@ func (p process) callHandler(message Message, thisNode string) {
|
||||||
// If only sig check enabled, and sig OK, we should allow the message.
|
// If only sig check enabled, and sig OK, we should allow the message.
|
||||||
case conf.EnableSignatureCheck && !conf.EnableAclCheck:
|
case conf.EnableSignatureCheck && !conf.EnableAclCheck:
|
||||||
sigOK := p.nodeAuth.verifySignature(message)
|
sigOK := p.nodeAuth.verifySignature(message)
|
||||||
er = fmt.Errorf("CHECK SIG TRUE: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG TRUE", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
if sigOK {
|
if sigOK {
|
||||||
er = fmt.Errorf("CHECK SIG TRUE EVALUATED TO TRUE, EXECUTING HANDLER: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG TRUE EVALUATED TO TRUE, EXECUTING HANDLER", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
executeHandler(p, message, thisNode)
|
executeHandler(p, message, thisNode)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
er = fmt.Errorf("CHECK SIG TRUE EVALUATED TO FALSE: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG TRUE EVALUATED TO FALSE", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// If both sig and acl check enabled, and sig and acl OK, we should allow the message.
|
// If both sig and acl check enabled, and sig and acl OK, we should allow the message.
|
||||||
case conf.EnableSignatureCheck && conf.EnableAclCheck:
|
case conf.EnableSignatureCheck && conf.EnableAclCheck:
|
||||||
sigOK := p.nodeAuth.verifySignature(message)
|
sigOK := p.nodeAuth.verifySignature(message)
|
||||||
aclOK := p.nodeAuth.verifyAcl(message)
|
aclOK := p.nodeAuth.verifyAcl(message)
|
||||||
er = fmt.Errorf("CHECK SIG AND ACK TRUE: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG AND ACK TRUE", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
if sigOK && aclOK {
|
if sigOK && aclOK {
|
||||||
er = fmt.Errorf("CHECK SIG AND ACK TRUE EVALUATED TO FALSE, EXECUTING HANDLER: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG AND ACK TRUE EVALUATED TO FALSE, EXECUTING HANDLER", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
executeHandler(p, message, thisNode)
|
executeHandler(p, message, thisNode)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
er = fmt.Errorf("CHECK SIG AND ACK TRUE EVALUATED TO FALSE: %v", message.Method)
|
p.errorKernel.logDebug("callhandler: CHECK SIG AND ACK TRUE EVALUATED TO FALSE", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
er = fmt.Errorf("callHandler: None of the verify flags matched, not doing handler for message, method=%v", message.Method)
|
p.errorKernel.logDebug("callHandler: None of the verify flags matched, not doing handler for message", "method", message.Method)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.errorKernel.logDebug(er)
|
er := fmt.Errorf("error: subscriberHandler: ACL or Signature were verified not-OK, doing nothing")
|
||||||
|
|
||||||
er = fmt.Errorf("error: subscriberHandler: ACL or Signature were verified not-OK, doing nothing")
|
|
||||||
p.errorKernel.errSend(p, message, er, logWarning)
|
p.errorKernel.errSend(p, message, er, logWarning)
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
@ -602,16 +577,14 @@ func executeHandler(p process, message Message, thisNode string) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-p.ctx.Done():
|
case <-p.ctx.Done():
|
||||||
er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
p.errorKernel.logDebug("executeHandler: proc ctx done", "toNode", message.ToNode, "fromNode", message.FromNode, "method", message.Method, "methodArgs", message.MethodArgs)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
//cancel()
|
//cancel()
|
||||||
return
|
return
|
||||||
case <-totalTimeTicker.C:
|
case <-totalTimeTicker.C:
|
||||||
// Total time reached. End the process.
|
// Total time reached. End the process.
|
||||||
//cancel()
|
//cancel()
|
||||||
er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
p.errorKernel.logDebug("executeHandler: schedule totalTime done", "toNode", message.ToNode, "fromNode", message.FromNode, "method", message.Method, "methodArgs", message.MethodArgs)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -641,8 +614,7 @@ func (p process) startNatsSubscriber() *nats.Subscription {
|
||||||
go p.messageSubscriberHandler(p.natsConn, p.configuration.NodeName, msg, subject)
|
go p.messageSubscriberHandler(p.natsConn, p.configuration.NodeName, msg, subject)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: Subscribe failed: %v", err)
|
p.errorKernel.logDebug("Subscribe failed", "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,8 +636,7 @@ func (p process) publishAMessage(m Message, natsConn *nats.Conn) {
|
||||||
|
|
||||||
b, err := p.server.messageSerializeAndCompress(m)
|
b, err := p.server.messageSerializeAndCompress(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: publishAMessage: serialize and compress failed: %v", err)
|
p.errorKernel.logDebug("publishAMessage: serialize and compress failed", "error", err)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,8 +213,7 @@ func newStartup(server *server) *startup {
|
||||||
// startProcess will start a process. It takes the initial process, request method,
|
// startProcess will start a process. It takes the initial process, request method,
|
||||||
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
|
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
|
||||||
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, proc process, procFuncCh chan Message) error) {
|
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, proc process, procFuncCh chan Message) error) {
|
||||||
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
|
p.errorKernel.logDebug("starting subscriber", "node", m, p.node)
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
var sub Subject
|
var sub Subject
|
||||||
switch {
|
switch {
|
||||||
|
@ -235,15 +234,13 @@ func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context,
|
||||||
|
|
||||||
// Print the content of the processes map.
|
// Print the content of the processes map.
|
||||||
func (p *processes) printProcessesMap() {
|
func (p *processes) printProcessesMap() {
|
||||||
er := fmt.Errorf("output of processes map : ")
|
p.errorKernel.logDebug("output of processes map : ")
|
||||||
p.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
{
|
{
|
||||||
p.active.mu.Lock()
|
p.active.mu.Lock()
|
||||||
|
|
||||||
for pName, proc := range p.active.procNames {
|
for pName, proc := range p.active.procNames {
|
||||||
er := fmt.Errorf("info: proc - procName in map: %v , id: %v, subject: %v", pName, proc.processID, proc.subject.name())
|
p.errorKernel.logDebug("process map", "name", pName, "ID", proc.processID, "subject", proc.subject.name())
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
||||||
|
|
|
@ -14,8 +14,7 @@ import (
|
||||||
|
|
||||||
// Handler to get all acl's from a central server.
|
// Handler to get all acl's from a central server.
|
||||||
func methodAclRequestUpdate(proc process, message Message, node string) ([]byte, error) {
|
func methodAclRequestUpdate(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- subscriber methodAclRequestUpdate received from: %v, hash data = %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- subscriber methodAclRequestUpdate received from node with hash", "fromNode", message.FromNode, "hash", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// fmt.Printf("\n --- subscriber methodAclRequestUpdate: the message brought to handler : %+v\n", message)
|
// fmt.Printf("\n --- subscriber methodAclRequestUpdate: the message brought to handler : %+v\n", message)
|
||||||
|
|
||||||
|
@ -46,23 +45,20 @@ func methodAclRequestUpdate(proc process, message Message, node string) ([]byte,
|
||||||
proc.centralAuth.accessLists.schemaGenerated.mu.Lock()
|
proc.centralAuth.accessLists.schemaGenerated.mu.Lock()
|
||||||
defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock()
|
defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock()
|
||||||
|
|
||||||
er := fmt.Errorf("info: subscriber methodAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("methodAclRequestUpdate: got acl hash from node with hash", "fromNode", message.FromNode, "hash", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// Check if the received hash is the same as the one currently active,
|
// Check if the received hash is the same as the one currently active,
|
||||||
// If it is the same we exit the handler immediately.
|
// If it is the same we exit the handler immediately.
|
||||||
hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash
|
hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash
|
||||||
hash := hash32[:]
|
hash := hash32[:]
|
||||||
er = fmt.Errorf("info: subscriber methodAclRequestUpdate: the central acl hash=%v", hash32)
|
proc.errorKernel.logDebug("methodAclRequestUpdate: the central acl hash", "hash", hash32)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
if bytes.Equal(hash, message.Data) {
|
if bytes.Equal(hash, message.Data) {
|
||||||
er := fmt.Errorf("info: subscriber methodAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER")
|
proc.errorKernel.logDebug("info: subscriber methodAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER")
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
er = fmt.Errorf("info: subscriber methodAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl")
|
proc.errorKernel.logDebug("info: subscriber methodAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl")
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// Generate JSON for Message.Data
|
// Generate JSON for Message.Data
|
||||||
|
|
||||||
|
@ -78,8 +74,7 @@ func methodAclRequestUpdate(proc process, message Message, node string) ([]byte,
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
er = fmt.Errorf("----> subscriber methodAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh)
|
proc.errorKernel.logDebug("----> subscriber methodAclRequestUpdate: SENDING ACL'S TO NODE", "node", message.FromNode, "serializedAndHash", hdh)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
newReplyMessage(proc, message, js)
|
newReplyMessage(proc, message, js)
|
||||||
}()
|
}()
|
||||||
|
@ -100,8 +95,7 @@ func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan
|
||||||
// and update with new keys back.
|
// and update with new keys back.
|
||||||
|
|
||||||
proc.nodeAuth.nodeAcl.mu.Lock()
|
proc.nodeAuth.nodeAcl.mu.Lock()
|
||||||
er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
|
proc.errorKernel.logDebug(" ----> publisher AclRequestUpdate: sending our current hash", "hash", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
m := Message{
|
m := Message{
|
||||||
FileName: "aclRequestUpdate.log",
|
FileName: "aclRequestUpdate.log",
|
||||||
|
@ -121,9 +115,8 @@ func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
proc.errorKernel.logDebug("stopped handleFunc for publisher", "subject", proc.subject.name())
|
||||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,8 +126,7 @@ func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan
|
||||||
|
|
||||||
// Handler to receive the acls from a central server.
|
// Handler to receive the acls from a central server.
|
||||||
func methodAclDeliverUpdate(proc process, message Message, node string) ([]byte, error) {
|
func methodAclDeliverUpdate(proc process, message Message, node string) ([]byte, error) {
|
||||||
inf := fmt.Errorf("<--- subscriber methodAclDeliverUpdate received from: %v, containing: %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- subscriber methodAclDeliverUpdate received from", "fromNode", message.FromNode, "data", message.Data)
|
||||||
proc.errorKernel.logDebug(inf)
|
|
||||||
|
|
||||||
// fmt.Printf("\n --- subscriber methodAclRequestUpdate: the message received on handler : %+v\n\n", message)
|
// fmt.Printf("\n --- subscriber methodAclRequestUpdate: the message received on handler : %+v\n\n", message)
|
||||||
|
|
||||||
|
@ -210,8 +202,7 @@ func methodAclDeliverUpdate(proc process, message Message, node string) ([]byte,
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclAddCommand(proc process, message Message, node string) ([]byte, error) {
|
func methodAclAddCommand(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclAddCommand received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -272,8 +263,7 @@ func methodAclAddCommand(proc process, message Message, node string) ([]byte, er
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclDeleteCommand(proc process, message Message, node string) ([]byte, error) {
|
func methodAclDeleteCommand(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclDeleteCommand received", "fromnode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -334,8 +324,7 @@ func methodAclDeleteCommand(proc process, message Message, node string) ([]byte,
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclDeleteSource(proc process, message Message, node string) ([]byte, error) {
|
func methodAclDeleteSource(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclDeleteSource received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclDeleteSource received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -395,8 +384,7 @@ func methodAclDeleteSource(proc process, message Message, node string) ([]byte,
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupNodesAddNode(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupNodesAddNode(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupNodesAddNode received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupNodesAddNode received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -456,8 +444,7 @@ func methodAclGroupNodesAddNode(proc process, message Message, node string) ([]b
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupNodesDeleteNode(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupNodesDeleteNode(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupNodesDeleteNode received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupNodesDeleteNode received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -517,8 +504,7 @@ func methodAclGroupNodesDeleteNode(proc process, message Message, node string) (
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupNodesDeleteGroup(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupNodesDeleteGroup(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupNodesDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupNodesDeleteGroup received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -577,8 +563,7 @@ func methodAclGroupNodesDeleteGroup(proc process, message Message, node string)
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupCommandsAddCommand(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupCommandsAddCommand(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupCommandsAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupCommandsAddCommand received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -638,8 +623,7 @@ func methodAclGroupCommandsAddCommand(proc process, message Message, node string
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupCommandsDeleteCommand(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupCommandsDeleteCommand(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupCommandsDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupCommandsDeleteCommand received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -699,8 +683,7 @@ func methodAclGroupCommandsDeleteCommand(proc process, message Message, node str
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclGroupCommandsDeleteGroup(proc process, message Message, node string) ([]byte, error) {
|
func methodAclGroupCommandsDeleteGroup(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclGroupCommandsDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclGroupCommandsDeleteGroup received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -759,8 +742,7 @@ func methodAclGroupCommandsDeleteGroup(proc process, message Message, node strin
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclExport(proc process, message Message, node string) ([]byte, error) {
|
func methodAclExport(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclExport received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclExport received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -815,8 +797,7 @@ func methodAclExport(proc process, message Message, node string) ([]byte, error)
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
func methodAclImport(proc process, message Message, node string) ([]byte, error) {
|
func methodAclImport(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- methodAclImport received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodAclImport received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -15,8 +15,7 @@ import (
|
||||||
// return the output of the command run back to the calling publisher
|
// return the output of the command run back to the calling publisher
|
||||||
// as a new message.
|
// as a new message.
|
||||||
func methodCliCommand(proc process, message Message, node string) ([]byte, error) {
|
func methodCliCommand(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- CLICommandREQUEST received", "fromNode", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
msgForErrors := message
|
msgForErrors := message
|
||||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||||
|
@ -128,8 +127,7 @@ func methodCliCommand(proc process, message Message, node string) ([]byte, error
|
||||||
// longer time and you want to send the output of the command continually
|
// longer time and you want to send the output of the command continually
|
||||||
// back as it is generated, and not just when the command is finished.
|
// back as it is generated, and not just when the command is finished.
|
||||||
func methodCliCommandCont(proc process, message Message, node string) ([]byte, error) {
|
func methodCliCommandCont(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- CLInCommandCont REQUEST received", "fromNode", message.FromNode, "methodArgs", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
msgForErrors := message
|
msgForErrors := message
|
||||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||||
|
|
|
@ -108,8 +108,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
// The value will be replaced
|
// The value will be replaced
|
||||||
folderPermission := uint64(0755)
|
folderPermission := uint64(0755)
|
||||||
|
|
||||||
er := fmt.Errorf("info: before switch: FolderPermission defined in message for socket: %04o", folderPermission)
|
proc.errorKernel.logDebug("methodCopySrc: before switch: FolderPermission defined in message for socket: %04o", "folderPermission", folderPermission)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
// Verify and check the methodArgs
|
// Verify and check the methodArgs
|
||||||
|
|
||||||
if len(message.MethodArgs) < 3 {
|
if len(message.MethodArgs) < 3 {
|
||||||
|
@ -146,8 +145,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
proc.errorKernel.logError("methodCopySrc: failed to parse uint", "error", err)
|
proc.errorKernel.logError("methodCopySrc: failed to parse uint", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: FolderPermission defined in message for socket: %v, converted = %v", message.MethodArgs[5], folderPermission)
|
proc.errorKernel.logDebug("methodCopySrc: FolderPermission defined in message for socket", "socket", message.MethodArgs[5], "converted", folderPermission)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodCopySrc: unable to convert folderPermission into int value: %v", err)
|
er := fmt.Errorf("error: methodCopySrc: unable to convert folderPermission into int value: %v", err)
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
|
@ -184,8 +183,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
fileInfo, err := os.Stat(SrcFilePath)
|
fileInfo, err := os.Stat(SrcFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// errCh <- fmt.Errorf("error: methodCopySrc: failed to open file: %v, %v", SrcFilePath, err)
|
// errCh <- fmt.Errorf("error: methodCopySrc: failed to open file: %v, %v", SrcFilePath, err)
|
||||||
er := fmt.Errorf("error: copySrcSubProcFunc: failed to stat file: %v", err)
|
proc.errorKernel.logDebug("error: copySrcSubProcFunc: failed to stat", "file", err)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,8 +317,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
||||||
proc.processes.active.mu.Unlock()
|
proc.processes.active.mu.Unlock()
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
er := fmt.Errorf("methodCopyDst: subprocesses already existed, will not start another subscriber for %v", pn)
|
proc.errorKernel.logDebug("methodCopyDst: subprocesses already existed, will not start another subscriber for", "processName", pn)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// HERE!!!
|
// HERE!!!
|
||||||
// If the process name already existed we return here before any
|
// If the process name already existed we return here before any
|
||||||
|
@ -361,11 +358,9 @@ func copySrcSubHandler() func(process, Message, string) ([]byte, error) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-proc.ctx.Done():
|
case <-proc.ctx.Done():
|
||||||
er := fmt.Errorf(" * copySrcHandler ended: %v", proc.processName)
|
proc.errorKernel.logDebug("copySrcHandler: process ended", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
case proc.procFuncCh <- message:
|
case proc.procFuncCh <- message:
|
||||||
er := fmt.Errorf("copySrcHandler: passing message over to procFunc: %v", proc.processName)
|
proc.errorKernel.logDebug("copySrcHandler: passing message over to procFunc", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -379,11 +374,10 @@ func copyDstSubHandler() func(process, Message, string) ([]byte, error) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-proc.ctx.Done():
|
case <-proc.ctx.Done():
|
||||||
er := fmt.Errorf(" * copyDstHandler ended: %v", proc.processName)
|
proc.errorKernel.logDebug("copyDstHandler: process ended", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
case proc.procFuncCh <- message:
|
case proc.procFuncCh <- message:
|
||||||
er := fmt.Errorf("copyDstHandler: passing message over to procFunc: %v", proc.processName)
|
proc.errorKernel.logDebug("copyDstHandler: passing message over to procFunc", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -464,8 +458,7 @@ func copySrcSubProcFunc(cia copyInitialData, cancel context.CancelFunc, initialM
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf(" info: canceling copySrcProcFunc : %v", proc.processName)
|
proc.errorKernel.logDebug("copySrcProcFunc: canceling procFunc", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
// Pick up the message recived by the copySrcSubHandler.
|
// Pick up the message recived by the copySrcSubHandler.
|
||||||
|
@ -678,8 +671,7 @@ func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.Can
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf(" * copyDstProcFunc ended: %v", proc.processName)
|
proc.errorKernel.logDebug("copyDstProcFunc: procFunc ended: %v", "processName", proc.processName)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
case message := <-procFuncCh:
|
case message := <-procFuncCh:
|
||||||
var csa copySubData
|
var csa copySubData
|
||||||
|
@ -694,8 +686,7 @@ func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.Can
|
||||||
// trigger the resend of the last message in the switch below.
|
// trigger the resend of the last message in the switch below.
|
||||||
hash := sha256.Sum256(csa.CopyData)
|
hash := sha256.Sum256(csa.CopyData)
|
||||||
if hash != csa.Hash {
|
if hash != csa.Hash {
|
||||||
er := fmt.Errorf("error: copyDstSubProcFunc: hash of received message is not correct for: %v", cia.DstMethod)
|
proc.errorKernel.logDebug("copyDstSubProcFunc: hash of received message is not correct for", "DstMethod", cia.DstMethod)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
csa.CopyStatus = copyResendLast
|
csa.CopyStatus = copyResendLast
|
||||||
}
|
}
|
||||||
|
@ -785,16 +776,14 @@ func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.Can
|
||||||
filePath := filepath.Join(cia.DstDir, cia.DstFile)
|
filePath := filepath.Join(cia.DstDir, cia.DstFile)
|
||||||
|
|
||||||
// HERE:
|
// HERE:
|
||||||
er := fmt.Errorf("info: Before creating folder: cia.FolderPermission: %04o", cia.FolderPermission)
|
proc.errorKernel.logDebug("copyDstSubProcFunc: Before creating folder: cia.FolderPermission: %04o", "FolderPermission", cia.FolderPermission)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) {
|
if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) {
|
||||||
err := os.MkdirAll(cia.DstDir, fs.FileMode(cia.FolderPermission))
|
err := os.MkdirAll(cia.DstDir, fs.FileMode(cia.FolderPermission))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error: failed to create destination directory for file copying %v: %v", cia.DstDir, err)
|
return fmt.Errorf("copyDstSubProcFunc: failed to create destination directory for file copying %v: %v", cia.DstDir, err)
|
||||||
}
|
}
|
||||||
er := fmt.Errorf("info: Created folder: with cia.FolderPermission: %04o", cia.FolderPermission)
|
proc.errorKernel.logDebug("copyDstSubProcFunc: Created folder: with cia.FolderPermission: %04o", "folderPermission", cia.FolderPermission)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename the file so we got a backup.
|
// Rename the file so we got a backup.
|
||||||
|
@ -892,8 +881,7 @@ func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.Can
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
er = fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath)
|
proc.errorKernel.logDebug("copyDstSubProcFunc: copy: successfully wrote all split chunk files into", "file", filePath)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// Signal back to src that we are done, so it can cancel the process.
|
// Signal back to src that we are done, so it can cancel the process.
|
||||||
{
|
{
|
||||||
|
|
|
@ -23,8 +23,7 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
|
||||||
// data to the socket instead of writing it to a normal file.
|
// data to the socket instead of writing it to a normal file.
|
||||||
fi, err := os.Stat(file)
|
fi, err := os.Stat(file)
|
||||||
if err == nil && !os.IsNotExist(err) {
|
if err == nil && !os.IsNotExist(err) {
|
||||||
er := fmt.Errorf("info: reqWriteFileOrSocket: failed to stat file, but will continue: %v", folderTree)
|
proc.errorKernel.logDebug("reqWriteFileOrSocket: failed to stat file, but will continue", "folderTree", folderTree)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if fi != nil && fi.Mode().Type() == fs.ModeSocket {
|
if fi != nil && fi.Mode().Type() == fs.ModeSocket {
|
||||||
|
@ -53,8 +52,7 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
proc.errorKernel.logDebug("reqWriteFileOrSocket: Creating subscribers data folder at", "folderTree", folderTree)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var fileFlag int
|
var fileFlag int
|
||||||
|
@ -112,8 +110,7 @@ func methodToFile(proc process, message Message, node string) ([]byte, error) {
|
||||||
// return the output of the command run back to the calling publisher
|
// return the output of the command run back to the calling publisher
|
||||||
// as a new message.
|
// as a new message.
|
||||||
func methodTailFile(proc process, message Message, node string) ([]byte, error) {
|
func methodTailFile(proc process, message Message, node string) ([]byte, error) {
|
||||||
inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- TailFile REQUEST received", "fromNode", message.FromNode, "data", message.Data)
|
||||||
proc.errorKernel.logDebug(inf)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -11,8 +11,7 @@ import (
|
||||||
|
|
||||||
// handler to do a Http Get.
|
// handler to do a Http Get.
|
||||||
func methodHttpGet(proc process, message Message, node string) ([]byte, error) {
|
func methodHttpGet(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- REQHttpGet received", "fromNode", message.FromNode, "data", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
msgForErrors := message
|
msgForErrors := message
|
||||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||||
|
@ -112,8 +111,7 @@ func methodHttpGet(proc process, message Message, node string) ([]byte, error) {
|
||||||
// handler to do a Http Get Scheduled.
|
// handler to do a Http Get Scheduled.
|
||||||
// The second element of the MethodArgs slice holds the timer defined in seconds.
|
// The second element of the MethodArgs slice holds the timer defined in seconds.
|
||||||
func methodHttpGetScheduled(proc process, message Message, node string) ([]byte, error) {
|
func methodHttpGetScheduled(proc process, message Message, node string) ([]byte, error) {
|
||||||
er := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug("<--- REQHttpGetScheduled received", "fromNode", message.FromNode, "data", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -88,22 +88,18 @@ func methodKeysUpdateRequest(proc process, message Message, node string) ([]byte
|
||||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
er := fmt.Errorf(" <---- methodKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data)
|
proc.errorKernel.logDebug(" <---- methodKeysRequestUpdate: received hash from node", "fromNode", message.FromNode, "data", message.Data)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// Check if the received hash is the same as the one currently active,
|
// Check if the received hash is the same as the one currently active,
|
||||||
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
||||||
er := fmt.Errorf("info: methodKeysRequestUpdate: node %v and central have equal keys, nothing to do, exiting key update handler", message.FromNode)
|
proc.errorKernel.logDebug("methodKeysUpdateRequest: node and central have equal keys, nothing to do, exiting key update handler", "fromNode", message.FromNode)
|
||||||
// proc.errorKernel.infoSend(proc, message, er)
|
// proc.errorKernel.infoSend(proc, message, er)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
er = fmt.Errorf("info: methodKeysRequestUpdate: node %v and central had not equal keys, preparing to send new version of keys", message.FromNode)
|
proc.errorKernel.logDebug("methodKeysUpdateRequest: node and central had not equal keys, preparing to send new version of keys", "fromNode", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
er = fmt.Errorf("info: methodKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
proc.errorKernel.logDebug("methodKeysUpdateRequest: marshalling new keys and hash to send", "keys", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, "hash", proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
||||||
|
|
||||||
|
@ -111,8 +107,7 @@ func methodKeysUpdateRequest(proc process, message Message, node string) ([]byte
|
||||||
er := fmt.Errorf("error: methodKeysRequestUpdate, failed to marshal keys map: %v", err)
|
er := fmt.Errorf("error: methodKeysRequestUpdate, failed to marshal keys map: %v", err)
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
er = fmt.Errorf("----> methodKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode)
|
proc.errorKernel.logDebug("----> methodKeysUpdateRequest: SENDING KEYS TO NODE=", "node", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
newReplyMessage(proc, message, b)
|
newReplyMessage(proc, message, b)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -135,8 +130,7 @@ func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh cha
|
||||||
// and update with new keys back.
|
// and update with new keys back.
|
||||||
|
|
||||||
proc.nodeAuth.publicKeys.mu.Lock()
|
proc.nodeAuth.publicKeys.mu.Lock()
|
||||||
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
proc.errorKernel.logDebug(" ----> publisher KeysRequestUpdate: sending our current hash", "hash", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
m := Message{
|
m := Message{
|
||||||
FileName: "publickeysget.log",
|
FileName: "publickeysget.log",
|
||||||
|
@ -156,9 +150,8 @@ func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh cha
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
proc.errorKernel.logDebug("procFuncKeysRequestUpdate: stopped handleFunc for: publisher", "subject", proc.subject.name())
|
||||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,8 +196,7 @@ func methodKeysUpdateReceive(proc process, message Message, node string) ([]byte
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("<---- methodKeysReceiveUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash)
|
proc.errorKernel.logDebug("<---- methodKeysUpdateReceive: after unmarshal, nodeAuth keysAndhash contains", "keysAndHash", keysAndHash)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// If the received map was empty we also want to delete all the locally stored keys,
|
// If the received map was empty we also want to delete all the locally stored keys,
|
||||||
// else we copy the marshaled keysAndHash we received from central into our map.
|
// else we copy the marshaled keysAndHash we received from central into our map.
|
||||||
|
@ -325,8 +317,7 @@ func methodKeysAllow(proc process, message Message, node string) ([]byte, error)
|
||||||
// nodesAcked map since it will contain the nodes that were deleted so we are
|
// nodesAcked map since it will contain the nodes that were deleted so we are
|
||||||
// also able to send an update to them as well.
|
// also able to send an update to them as well.
|
||||||
func pushKeys(proc process, message Message, nodes []Node) error {
|
func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes)
|
proc.errorKernel.logDebug("methodKeysAllow: beginning of pushKeys", "nodes", nodes)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
@ -343,8 +334,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
// For all nodes that is not ack'ed we try to send an update once.
|
// For all nodes that is not ack'ed we try to send an update once.
|
||||||
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
||||||
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
proc.errorKernel.logDebug("pushKeys: node to send REQKeysDeliverUpdate to", "node", n)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
msg := Message{
|
msg := Message{
|
||||||
ToNode: n,
|
ToNode: n,
|
||||||
Method: KeysUpdateReceive,
|
Method: KeysUpdateReceive,
|
||||||
|
@ -355,8 +345,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
proc.newMessagesCh <- msg
|
proc.newMessagesCh <- msg
|
||||||
|
|
||||||
er = fmt.Errorf("----> pushKeys: SENDING KEYS TO NODE=%v", message.FromNode)
|
proc.errorKernel.logDebug("----> pushKeys: SENDING KEYS TO NODE", "node", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Concatenate the current nodes in the keysAndHash map and the nodes
|
// Concatenate the current nodes in the keysAndHash map and the nodes
|
||||||
|
@ -372,8 +361,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
// For all nodes that is ack'ed we try to send an update once.
|
// For all nodes that is ack'ed we try to send an update once.
|
||||||
for n := range nodeMap {
|
for n := range nodeMap {
|
||||||
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
proc.errorKernel.logDebug("pushKeys: node to send REQKeysDeliverUpdate to", "node", n)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
msg := Message{
|
msg := Message{
|
||||||
ToNode: n,
|
ToNode: n,
|
||||||
Method: KeysUpdateReceive,
|
Method: KeysUpdateReceive,
|
||||||
|
@ -384,8 +372,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
|
|
||||||
proc.newMessagesCh <- msg
|
proc.newMessagesCh <- msg
|
||||||
|
|
||||||
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
|
proc.errorKernel.logDebug("----> methodKeysAllow: sending keys update to", "node", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -393,8 +380,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func methodKeysDelete(proc process, message Message, node string) ([]byte, error) {
|
func methodKeysDelete(proc process, message Message, node string) ([]byte, error) {
|
||||||
inf := fmt.Errorf("<--- methodKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
proc.errorKernel.logDebug("<--- methodKeysDelete received from", "node", message.FromNode, "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(inf)
|
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -423,14 +409,12 @@ func methodKeysDelete(proc process, message Message, node string) ([]byte, error
|
||||||
// of doing it for each node delete.
|
// of doing it for each node delete.
|
||||||
|
|
||||||
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
||||||
er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs)
|
proc.errorKernel.logDebug("methodKeysDelete: Deleted public keys", "methodArgs", message.MethodArgs)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
// All new elements are now added, and we can create a new hash
|
// All new elements are now added, and we can create a new hash
|
||||||
// representing the current keys in the allowed map.
|
// representing the current keys in the allowed map.
|
||||||
proc.centralAuth.updateHash(proc, message)
|
proc.centralAuth.updateHash(proc, message)
|
||||||
er = fmt.Errorf(" * DEBUG updated hash for public keys")
|
proc.errorKernel.logDebug("methodKeysDelete: updated hash for public keys")
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
|
|
||||||
var nodes []Node
|
var nodes []Node
|
||||||
|
|
||||||
|
|
|
@ -27,8 +27,7 @@ func methodHello(proc process, message Message, node string) ([]byte, error) {
|
||||||
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
proc.errorKernel.logDebug("methodHello: Creating subscribers data folder at ", "foldertree", folderTree)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open file and write data.
|
// Open file and write data.
|
||||||
|
@ -75,9 +74,8 @@ func procFuncHelloSubscriber(ctx context.Context, proc process, procFuncCh chan
|
||||||
select {
|
select {
|
||||||
case m = <-procFuncCh:
|
case m = <-procFuncCh:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
|
proc.errorKernel.logDebug("procFuncHelloSubscriber: stopped handleFunc for: subscriber", "subject", proc.subject.name())
|
||||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,9 +118,8 @@ func procFuncHelloPublisher(ctx context.Context, proc process, procFuncCh chan M
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
proc.errorKernel.logDebug("procFuncHelloPublisher: stopped handleFunc for: publisher", "subject", proc.subject.name())
|
||||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,8 +142,7 @@ func methodErrorLog(proc process, message Message, node string) ([]byte, error)
|
||||||
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
proc.errorKernel.logDebug("methodErrorLog: Creating subscribers data folder", "foldertree", folderTree)
|
||||||
proc.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open file and write data.
|
// Open file and write data.
|
||||||
|
|
|
@ -272,8 +272,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
||||||
return nil, fmt.Errorf("error: failed to create data folder directory %v: %v", configuration.SubscribersDataFolder, err)
|
return nil, fmt.Errorf("error: failed to create data folder directory %v: %v", configuration.SubscribersDataFolder, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
er := fmt.Errorf("info: creating subscribers data folder at %v", configuration.SubscribersDataFolder)
|
s.errorKernel.logDebug("NewServer: creating subscribers data folder at", "path", configuration.SubscribersDataFolder)
|
||||||
s.errorKernel.logDebug(er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s, nil
|
return &s, nil
|
||||||
|
@ -637,7 +636,6 @@ func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) {
|
||||||
bSerialized, err := cbor.Marshal(msg)
|
bSerialized, err := cbor.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||||
s.errorKernel.logDebug(er)
|
|
||||||
return nil, er
|
return nil, er
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue