mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
renamed logConsoleOnlyIfDebug to logDebug
This commit is contained in:
parent
076abdef71
commit
8c8061dc15
15 changed files with 162 additions and 162 deletions
|
@ -334,7 +334,7 @@ func (c *centralAuth) generateACLsForAllNodes() error {
|
|||
}
|
||||
|
||||
inf := fmt.Errorf("generateACLsFor all nodes, ACLsToConvert contains: %#v", c.accessLists.schemaGenerated.ACLsToConvert)
|
||||
c.accessLists.errorKernel.logConsoleOnlyIfDebug(inf, c.accessLists.configuration)
|
||||
c.accessLists.errorKernel.logDebug(inf, c.accessLists.configuration)
|
||||
|
||||
// ACLsToConvert got the complete picture of what ACL's that
|
||||
// are defined for each individual host node.
|
||||
|
@ -387,7 +387,7 @@ func (c *centralAuth) generateACLsForAllNodes() error {
|
|||
}()
|
||||
|
||||
inf = fmt.Errorf("generateACLsFor all nodes, GeneratedACLsMap contains: %#v", c.accessLists.schemaGenerated.GeneratedACLsMap)
|
||||
c.accessLists.errorKernel.logConsoleOnlyIfDebug(inf, c.accessLists.configuration)
|
||||
c.accessLists.errorKernel.logDebug(inf, c.accessLists.configuration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
|||
db, err := bolt.Open(databaseFilepath, 0660, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("newPKI: error: failed to open db: %v", err)
|
||||
errorKernel.logConsoleOnlyIfDebug(er, configuration)
|
||||
errorKernel.logDebug(er, configuration)
|
||||
return &p
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
|||
keys, err := p.dbDumpPublicKey()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("newPKI: dbPublicKeyDump failed, probably empty db: %v", err)
|
||||
errorKernel.logConsoleOnlyIfDebug(er, configuration)
|
||||
errorKernel.logDebug(er, configuration)
|
||||
}
|
||||
|
||||
// Only assign from storage to in memory map if the storage contained any values.
|
||||
|
@ -99,7 +99,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
|||
p.nodesAcked.keysAndHash.Keys = keys
|
||||
for k, v := range keys {
|
||||
er := fmt.Errorf("newPKI: public keys db contains: %v, %v", k, []byte(v))
|
||||
errorKernel.logConsoleOnlyIfDebug(er, configuration)
|
||||
errorKernel.logDebug(er, configuration)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
|||
|
||||
if ok && bytes.Equal(existingKey, msg.Data) {
|
||||
er := fmt.Errorf("info: public key value for REGISTERED node %v is the same, doing nothing", msg.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -147,7 +147,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
|||
|
||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||
c.pki.errorKernel.infoSend(proc, msg, er)
|
||||
c.pki.errorKernel.logConsoleOnlyIfDebug(er, c.pki.configuration)
|
||||
c.pki.errorKernel.logDebug(er, c.pki.configuration)
|
||||
}
|
||||
|
||||
// deletePublicKeys to the db if the node do not exist, or if it is a new value.
|
||||
|
@ -169,7 +169,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
c.pki.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
|
||||
|
@ -230,7 +230,7 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error {
|
|||
err := bu.Delete([]byte(n))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return er
|
||||
}
|
||||
}
|
||||
|
|
|
@ -288,7 +288,7 @@ func (e *errorKernel) logWarn(err error, c *Configuration) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *errorKernel) logConsoleOnlyIfDebug(err error, c *Configuration) {
|
||||
func (e *errorKernel) logDebug(err error, c *Configuration) {
|
||||
if c.LogLevel == string(logDebug) {
|
||||
slog.Debug(err.Error())
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ func (s *server) readFolder() {
|
|||
|
||||
if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod {
|
||||
er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
|
||||
func() {
|
||||
fh, err := os.Open(event.Name)
|
||||
|
|
|
@ -112,7 +112,7 @@ func (n *nodeAcl) loadFromFile() error {
|
|||
// Just logging the error since it is not crucial that a key file is missing,
|
||||
// since a new one will be created on the next update.
|
||||
er := fmt.Errorf("acl: loadFromFile: no acl file found at %v", n.filePath)
|
||||
n.errorKernel.logConsoleOnlyIfDebug(er, n.configuration)
|
||||
n.errorKernel.logDebug(er, n.configuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ func (n *nodeAcl) loadFromFile() error {
|
|||
}
|
||||
|
||||
er := fmt.Errorf("nodeAcl: loadFromFile: Loaded existing acl's from file: %v", n.aclAndHash.Hash)
|
||||
n.errorKernel.logConsoleOnlyIfDebug(er, n.configuration)
|
||||
n.errorKernel.logDebug(er, n.configuration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func (p *publicKeys) loadFromFile() error {
|
|||
}
|
||||
|
||||
er := fmt.Errorf("nodeAuth: loadFromFile: Loaded existing keys from file: %v", p.keysAndHash.Hash)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
68
process.go
68
process.go
|
@ -201,7 +201,7 @@ func (p process) spawnWorker() {
|
|||
p.processes.active.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf("successfully started process: %v", p.processName)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
func (p process) startPublisher() {
|
||||
|
@ -253,7 +253,7 @@ func (p process) startSubscriber() {
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
p.processes.active.mu.Lock()
|
||||
|
@ -261,7 +261,7 @@ func (p process) startSubscriber() {
|
|||
p.processes.active.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf("successfully stopped process: %v", p.processName)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
}()
|
||||
}
|
||||
|
@ -295,7 +295,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: preparing to send nats message with subject %v, id: %v", msg.Subject, message.ID)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -307,7 +307,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
err := natsConn.PublishMsg(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: nats publish for message with subject failed: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return ErrACKSubscribeRetry
|
||||
}
|
||||
p.metrics.promNatsDeliveredTotal.Inc()
|
||||
|
@ -349,7 +349,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
}
|
||||
|
||||
er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
// The SubscribeSync used in the subscriber, will get messages that
|
||||
// are sent after it started subscribing.
|
||||
|
@ -360,14 +360,14 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
err := subReply.Unsubscribe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||
er = fmt.Errorf("%v, waiting equal to RetryWait %ds before retrying", er, message.RetryWait)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||
|
||||
|
@ -379,7 +379,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: nats publish failed: %v, waiting equal to RetryWait of %ds before retrying", err, message.RetryWait)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||
|
||||
return ErrACKSubscribeRetry
|
||||
|
@ -397,7 +397,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
switch {
|
||||
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||
|
@ -406,13 +406,13 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
|
||||
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return er
|
||||
|
||||
default:
|
||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return er
|
||||
}
|
||||
|
@ -437,7 +437,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
p.metrics.promNatsDeliveredTotal.Inc()
|
||||
|
||||
er = fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -465,7 +465,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
// If debugging is enabled, print the source node name of the nats messages received.
|
||||
if val, ok := msg.Header["fromNode"]; ok {
|
||||
er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
// If compression is used, decompress it to get the gob data. If
|
||||
|
@ -570,7 +570,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
// Check for ACK type Event.
|
||||
case message.ACKTimeout >= 1:
|
||||
er := fmt.Errorf("subscriberHandler: received ACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
// When spawning sub processes we can directly assign handlers to the process upon
|
||||
// creation. We here check if a handler is already assigned, and if it is nil, we
|
||||
// lookup and find the correct handler to use if available.
|
||||
|
@ -595,7 +595,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
|
||||
case message.ACKTimeout < 1:
|
||||
er := fmt.Errorf("subscriberHandler: received NACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
// When spawning sub processes we can directly assign handlers to the process upon
|
||||
// creation. We here check if a handler is already assigned, and if it is nil, we
|
||||
// lookup and find the correct handler to use if available.
|
||||
|
@ -638,7 +638,7 @@ func (p process) callHandler(message Message, thisNode string) []byte {
|
|||
// ACL/Signature checking failed.
|
||||
er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing")
|
||||
p.errorKernel.errSend(p, message, er, logWarning)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -668,7 +668,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
if p.configuration.EnableAclCheck {
|
||||
// Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler.
|
||||
er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
switch {
|
||||
|
@ -679,7 +679,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -703,7 +703,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -711,7 +711,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
select {
|
||||
case <-p.ctx.Done():
|
||||
er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
//cancel()
|
||||
return
|
||||
|
@ -719,7 +719,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
// Total time reached. End the process.
|
||||
//cancel()
|
||||
er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return
|
||||
|
||||
|
@ -729,7 +729,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -757,7 +757,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
|
|||
sigOK := p.nodeAuth.verifySignature(message)
|
||||
|
||||
er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig: Only signature checking enabled, ALLOW the message if sigOK, sigOK=%v, method %v", sigOK, message.Method)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
if sigOK {
|
||||
doHandler = true
|
||||
|
@ -769,7 +769,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
|
|||
aclOK := p.nodeAuth.verifyAcl(message)
|
||||
|
||||
er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig:both signature and acl checking enabled, allow the message if sigOK and aclOK, or method is not REQCliCommand, sigOK=%v, aclOK=%v, method=%v", sigOK, aclOK, message.Method)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
if sigOK && aclOK {
|
||||
doHandler = true
|
||||
|
@ -779,7 +779,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
|
|||
// of doHandler=false, so the handler is not done.
|
||||
default:
|
||||
er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig: None of the verify flags matched, not doing handler for message, method=%v", message.Method)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
return doHandler
|
||||
|
@ -799,7 +799,7 @@ func (p process) subscribeMessages() *nats.Subscription {
|
|||
})
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: Subscribe failed: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -858,7 +858,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
|||
|
||||
er := fmt.Errorf("info: canceled publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
return
|
||||
//}
|
||||
|
@ -873,7 +873,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
|||
case <-p.ctx.Done():
|
||||
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -903,7 +903,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
|||
b, err := cbor.Marshal(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -916,7 +916,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
|||
err := gobEnc.Encode(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -960,7 +960,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
|||
_, err := gzipW.Write(natsMsgPayloadSerialized)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to write gzip: %v", err)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -976,11 +976,11 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
|||
default: // no compression
|
||||
// Allways log the error to console.
|
||||
er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression")
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
// We only wan't to send the error message to errorCentral once.
|
||||
once.Do(func() {
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
})
|
||||
|
||||
// No compression, so we just assign the value of the serialized
|
||||
|
|
98
processes.go
98
processes.go
|
@ -97,7 +97,7 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
{
|
||||
er := fmt.Errorf("tarting REQOpProcessList subscriber: %#v", proc.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQOpProcessList, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -105,7 +105,7 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
{
|
||||
er := fmt.Errorf("starting REQOpProcessStart subscriber: %#v", proc.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQOpProcessStart, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -113,7 +113,7 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
{
|
||||
er := fmt.Errorf("starting REQOpProcessStop subscriber: %#v", proc.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQOpProcessStop, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -121,7 +121,7 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
{
|
||||
er := fmt.Errorf("starting REQTest subscriber: %#v", proc.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQTest, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -264,7 +264,7 @@ func newStartup(server *server) *startup {
|
|||
func (s startup) subREQHttpGet(p process) {
|
||||
|
||||
er := fmt.Errorf("starting Http Get subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQHttpGet, string(p.node))
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -275,7 +275,7 @@ func (s startup) subREQHttpGet(p process) {
|
|||
func (s startup) subREQHttpGetScheduled(p process) {
|
||||
|
||||
er := fmt.Errorf("starting Http Get Scheduled subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
sub := newSubject(REQHttpGetScheduled, string(p.node))
|
||||
proc := newProcess(p.ctx, p.server, sub, processKindSubscriber, nil)
|
||||
|
@ -286,7 +286,7 @@ func (s startup) subREQHttpGetScheduled(p process) {
|
|||
|
||||
func (s startup) pubREQHello(p process) {
|
||||
er := fmt.Errorf("starting Hello Publisher: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
|
||||
|
@ -325,7 +325,7 @@ func (s startup) pubREQHello(p process) {
|
|||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -338,7 +338,7 @@ func (s startup) pubREQHello(p process) {
|
|||
// of type pubREQKeysDeliverUpdate.
|
||||
func (s startup) pubREQKeysRequestUpdate(p process) {
|
||||
er := fmt.Errorf("starting PublicKeysGet Publisher: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
sub := newSubject(REQKeysRequestUpdate, p.configuration.CentralNodeName)
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
|
||||
|
@ -355,7 +355,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
|
|||
|
||||
proc.nodeAuth.publicKeys.mu.Lock()
|
||||
er := fmt.Errorf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
m := Message{
|
||||
FileName: "publickeysget.log",
|
||||
|
@ -382,7 +382,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
|
|||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
|
|||
// of type pubREQKeysDeliverUpdate.
|
||||
func (s startup) pubREQAclRequestUpdate(p process) {
|
||||
er := fmt.Errorf("starting REQAclRequestUpdate Publisher: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
sub := newSubject(REQAclRequestUpdate, p.configuration.CentralNodeName)
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
|
||||
|
@ -412,7 +412,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
|
|||
|
||||
proc.nodeAuth.nodeAcl.mu.Lock()
|
||||
er := fmt.Errorf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
m := Message{
|
||||
FileName: "aclRequestUpdate.log",
|
||||
|
@ -440,7 +440,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
|
|||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -450,7 +450,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
|
|||
|
||||
func (s startup) subREQKeysRequestUpdate(p process) {
|
||||
er := fmt.Errorf("starting Public keys request update subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQKeysRequestUpdate, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -458,7 +458,7 @@ func (s startup) subREQKeysRequestUpdate(p process) {
|
|||
|
||||
func (s startup) subREQKeysDeliverUpdate(p process) {
|
||||
er := fmt.Errorf("starting Public keys to Node subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQKeysDeliverUpdate, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -466,7 +466,7 @@ func (s startup) subREQKeysDeliverUpdate(p process) {
|
|||
|
||||
func (s startup) subREQKeysAllow(p process) {
|
||||
er := fmt.Errorf("starting Public keys allow subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQKeysAllow, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -474,7 +474,7 @@ func (s startup) subREQKeysAllow(p process) {
|
|||
|
||||
func (s startup) subREQKeysDelete(p process) {
|
||||
er := fmt.Errorf("starting Public keys delete subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQKeysDelete, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -482,7 +482,7 @@ func (s startup) subREQKeysDelete(p process) {
|
|||
|
||||
func (s startup) subREQAclRequestUpdate(p process) {
|
||||
er := fmt.Errorf("starting Acl Request update subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclRequestUpdate, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -490,7 +490,7 @@ func (s startup) subREQAclRequestUpdate(p process) {
|
|||
|
||||
func (s startup) subREQAclDeliverUpdate(p process) {
|
||||
er := fmt.Errorf("starting Acl deliver update subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclDeliverUpdate, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -500,7 +500,7 @@ func (s startup) subREQAclDeliverUpdate(p process) {
|
|||
|
||||
func (s startup) subREQAclAddCommand(p process) {
|
||||
er := fmt.Errorf("starting Acl Add Command subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclAddCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -508,7 +508,7 @@ func (s startup) subREQAclAddCommand(p process) {
|
|||
|
||||
func (s startup) subREQAclDeleteCommand(p process) {
|
||||
er := fmt.Errorf("starting Acl Delete Command subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclDeleteCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -516,7 +516,7 @@ func (s startup) subREQAclDeleteCommand(p process) {
|
|||
|
||||
func (s startup) subREQAclDeleteSource(p process) {
|
||||
er := fmt.Errorf("starting Acl Delete Source subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclDeleteSource, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -524,7 +524,7 @@ func (s startup) subREQAclDeleteSource(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupNodesAddNode(p process) {
|
||||
er := fmt.Errorf("starting Acl Add node to nodeGroup subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupNodesAddNode, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -532,7 +532,7 @@ func (s startup) subREQAclGroupNodesAddNode(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupNodesDeleteNode(p process) {
|
||||
er := fmt.Errorf("starting Acl Delete node from nodeGroup subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupNodesDeleteNode, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -540,7 +540,7 @@ func (s startup) subREQAclGroupNodesDeleteNode(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupNodesDeleteGroup(p process) {
|
||||
er := fmt.Errorf("starting Acl Delete nodeGroup subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupNodesDeleteGroup, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -548,7 +548,7 @@ func (s startup) subREQAclGroupNodesDeleteGroup(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupCommandsAddCommand(p process) {
|
||||
er := fmt.Errorf("starting Acl add command to command group subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupCommandsAddCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -556,7 +556,7 @@ func (s startup) subREQAclGroupCommandsAddCommand(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupCommandsDeleteCommand(p process) {
|
||||
er := fmt.Errorf("starting Acl delete command from command group subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupCommandsDeleteCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -564,7 +564,7 @@ func (s startup) subREQAclGroupCommandsDeleteCommand(p process) {
|
|||
|
||||
func (s startup) subREQAclGroupCommandsDeleteGroup(p process) {
|
||||
er := fmt.Errorf("starting Acl delete command group subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclGroupCommandsDeleteGroup, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -572,7 +572,7 @@ func (s startup) subREQAclGroupCommandsDeleteGroup(p process) {
|
|||
|
||||
func (s startup) subREQAclExport(p process) {
|
||||
er := fmt.Errorf("starting Acl export subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclExport, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -580,7 +580,7 @@ func (s startup) subREQAclExport(p process) {
|
|||
|
||||
func (s startup) subREQAclImport(p process) {
|
||||
er := fmt.Errorf("starting Acl import subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQAclImport, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -588,7 +588,7 @@ func (s startup) subREQAclImport(p process) {
|
|||
|
||||
func (s startup) subREQToConsole(p process) {
|
||||
er := fmt.Errorf("starting Text To Console subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQToConsole, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -596,7 +596,7 @@ func (s startup) subREQToConsole(p process) {
|
|||
|
||||
func (s startup) subREQTuiToConsole(p process) {
|
||||
er := fmt.Errorf("starting Tui To Console subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQTuiToConsole, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -604,7 +604,7 @@ func (s startup) subREQTuiToConsole(p process) {
|
|||
|
||||
func (s startup) subREQCliCommand(p process) {
|
||||
er := fmt.Errorf("starting CLICommand Request subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQCliCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -612,7 +612,7 @@ func (s startup) subREQCliCommand(p process) {
|
|||
|
||||
func (s startup) subREQPong(p process) {
|
||||
er := fmt.Errorf("starting Pong subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQPong, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -620,7 +620,7 @@ func (s startup) subREQPong(p process) {
|
|||
|
||||
func (s startup) subREQPing(p process) {
|
||||
er := fmt.Errorf("starting Ping Request subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQPing, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -628,7 +628,7 @@ func (s startup) subREQPing(p process) {
|
|||
|
||||
func (s startup) subREQErrorLog(p process) {
|
||||
er := fmt.Errorf("starting REQErrorLog subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQErrorLog, "errorCentral")
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
|
@ -643,7 +643,7 @@ func (s startup) subREQErrorLog(p process) {
|
|||
// the procFunc running.
|
||||
func (s startup) subREQHello(p process) {
|
||||
er := fmt.Errorf("starting Hello subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQHello, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -663,7 +663,7 @@ func (s startup) subREQHello(p process) {
|
|||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -684,7 +684,7 @@ func (s startup) subREQHello(p process) {
|
|||
|
||||
func (s startup) subREQToFile(p process) {
|
||||
er := fmt.Errorf("starting text to file subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQToFile, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -693,7 +693,7 @@ func (s startup) subREQToFile(p process) {
|
|||
|
||||
func (s startup) subREQToFileNACK(p process) {
|
||||
er := fmt.Errorf("starting text to file subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQToFileNACK, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -702,7 +702,7 @@ func (s startup) subREQToFileNACK(p process) {
|
|||
|
||||
func (s startup) subREQCopySrc(p process) {
|
||||
er := fmt.Errorf("starting copy src subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQCopySrc, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -711,7 +711,7 @@ func (s startup) subREQCopySrc(p process) {
|
|||
|
||||
func (s startup) subREQCopyDst(p process) {
|
||||
er := fmt.Errorf("starting copy dst subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQCopyDst, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -720,7 +720,7 @@ func (s startup) subREQCopyDst(p process) {
|
|||
|
||||
func (s startup) subREQToFileAppend(p process) {
|
||||
er := fmt.Errorf("starting text logging subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQToFileAppend, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -729,7 +729,7 @@ func (s startup) subREQToFileAppend(p process) {
|
|||
|
||||
func (s startup) subREQTailFile(p process) {
|
||||
er := fmt.Errorf("starting tail log files subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQTailFile, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -738,7 +738,7 @@ func (s startup) subREQTailFile(p process) {
|
|||
|
||||
func (s startup) subREQCliCommandCont(p process) {
|
||||
er := fmt.Errorf("starting cli command with continous delivery: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQCliCommandCont, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -747,7 +747,7 @@ func (s startup) subREQCliCommandCont(p process) {
|
|||
|
||||
func (s startup) subREQPublicKey(p process) {
|
||||
er := fmt.Errorf("starting get Public Key subscriber: %#v", p.node)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(REQPublicKey, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
|
||||
|
@ -759,14 +759,14 @@ func (s startup) subREQPublicKey(p process) {
|
|||
// Print the content of the processes map.
|
||||
func (p *processes) printProcessesMap() {
|
||||
er := fmt.Errorf("output of processes map : ")
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
|
||||
{
|
||||
p.active.mu.Lock()
|
||||
|
||||
for pName, proc := range p.active.procNames {
|
||||
er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name())
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
||||
|
|
|
@ -21,7 +21,7 @@ func (m methodREQAclRequestUpdate) getKind() Event {
|
|||
// Handler to get all acl's from a central server.
|
||||
func (m methodREQAclRequestUpdate) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- subscriber methodREQAclRequestUpdate received from: %v, hash data = %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
// fmt.Printf("\n --- subscriber methodREQAclRequestUpdate: the message brought to handler : %+v\n", message)
|
||||
|
||||
|
@ -53,22 +53,22 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s
|
|||
defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// Check if the received hash is the same as the one currently active,
|
||||
// If it is the same we exit the handler immediately.
|
||||
hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash
|
||||
hash := hash32[:]
|
||||
er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: the central acl hash=%v", hash32)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
if bytes.Equal(hash, message.Data) {
|
||||
er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER")
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl")
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// Generate JSON for Message.Data
|
||||
|
||||
|
@ -85,7 +85,7 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s
|
|||
}
|
||||
|
||||
er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
newReplyMessage(proc, message, js)
|
||||
}()
|
||||
|
@ -109,7 +109,7 @@ func (m methodREQAclDeliverUpdate) getKind() Event {
|
|||
// Handler to receive the acls from a central server.
|
||||
func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- subscriber methodREQAclDeliverUpdate received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
// fmt.Printf("\n --- subscriber methodREQAclRequestUpdate: the message received on handler : %+v\n\n", message)
|
||||
|
||||
|
@ -194,7 +194,7 @@ func (m methodREQAclAddCommand) getKind() Event {
|
|||
|
||||
func (m methodREQAclAddCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -264,7 +264,7 @@ func (m methodREQAclDeleteCommand) getKind() Event {
|
|||
|
||||
func (m methodREQAclDeleteCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -334,7 +334,7 @@ func (m methodREQAclDeleteSource) getKind() Event {
|
|||
|
||||
func (m methodREQAclDeleteSource) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclDeleteSource received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -403,7 +403,7 @@ func (m methodREQAclGroupNodesAddNode) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupNodesAddNode) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupNodesAddNode received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -472,7 +472,7 @@ func (m methodREQAclGroupNodesDeleteNode) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupNodesDeleteNode) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupNodesDeleteNode received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -541,7 +541,7 @@ func (m methodREQAclGroupNodesDeleteGroup) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupNodesDeleteGroup) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupNodesDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -609,7 +609,7 @@ func (m methodREQAclGroupCommandsAddCommand) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupCommandsAddCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupCommandsAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -678,7 +678,7 @@ func (m methodREQAclGroupCommandsDeleteCommand) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupCommandsDeleteCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -747,7 +747,7 @@ func (m methodREQAclGroupCommandsDeleteGroup) getKind() Event {
|
|||
|
||||
func (m methodREQAclGroupCommandsDeleteGroup) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -815,7 +815,7 @@ func (m methodREQAclExport) getKind() Event {
|
|||
|
||||
func (m methodREQAclExport) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclExport received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -879,7 +879,7 @@ func (m methodREQAclImport) getKind() Event {
|
|||
|
||||
func (m methodREQAclImport) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQAclImport received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
|
|
@ -21,7 +21,7 @@ func (m methodREQCliCommand) getKind() Event {
|
|||
// as a new message.
|
||||
func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
msgForErrors := message
|
||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||
|
@ -146,7 +146,7 @@ func (m methodREQCliCommandCont) getKind() Event {
|
|||
// back as it is generated, and not just when the command is finished.
|
||||
func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
msgForErrors := message
|
||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||
|
|
|
@ -122,7 +122,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
folderPermission := uint64(0755)
|
||||
|
||||
er := fmt.Errorf("info: before switch: FolderPermission defined in message for socket: %04o", folderPermission)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
// Verify and check the methodArgs
|
||||
|
||||
if len(message.MethodArgs) < 3 {
|
||||
|
@ -161,7 +161,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: FolderPermission defined in message for socket: %v, converted = %v", message.MethodArgs[5], folderPermission)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopySrc: unable to convert folderPermission into int value: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
@ -199,7 +199,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
if err != nil {
|
||||
// errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err)
|
||||
er := fmt.Errorf("error: copySrcSubProcFunc: failed to stat file: %v", err)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -349,7 +349,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
|
|||
|
||||
if ok {
|
||||
er := fmt.Errorf("methodREQCopyDst: subprocesses already existed, will not start another subscriber for %v", pn)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// HERE!!!
|
||||
// If the process name already existed we return here before any
|
||||
|
@ -391,10 +391,10 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by
|
|||
select {
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" * copySrcHandler ended: %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
case proc.procFuncCh <- message:
|
||||
er := fmt.Errorf("copySrcHandler: passing message over to procFunc: %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
|
@ -409,10 +409,10 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by
|
|||
select {
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" * copyDstHandler ended: %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
case proc.procFuncCh <- message:
|
||||
er := fmt.Errorf("copyDstHandler: passing message over to procFunc: %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
|
@ -472,7 +472,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf(" info: canceling copySrcProcFunc : %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return nil
|
||||
|
||||
// Pick up the message recived by the copySrcSubHandler.
|
||||
|
@ -708,7 +708,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf(" * copyDstProcFunc ended: %v", proc.processName)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return nil
|
||||
case message := <-procFuncCh:
|
||||
var csa copySubData
|
||||
|
@ -724,7 +724,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
hash := sha256.Sum256(csa.CopyData)
|
||||
if hash != csa.Hash {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: hash of received message is not correct for: %v", cia.DstMethod)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
csa.CopyStatus = copyResendLast
|
||||
}
|
||||
|
@ -829,7 +829,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
|
||||
// HERE:
|
||||
er := fmt.Errorf("info: Before creating folder: cia.FolderPermission: %04o", cia.FolderPermission)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) {
|
||||
// TODO: Add option to set permission here ???
|
||||
|
@ -838,7 +838,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
return fmt.Errorf("error: failed to create destination directory for file copying %v: %v", cia.DstDir, err)
|
||||
}
|
||||
er := fmt.Errorf("info: Created folder: with cia.FolderPermission: %04o", cia.FolderPermission)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Rename the file so we got a backup.
|
||||
|
@ -937,7 +937,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
}
|
||||
|
||||
er = fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// Signal back to src that we are done, so it can cancel the process.
|
||||
{
|
||||
|
|
|
@ -32,7 +32,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
|
@ -85,7 +85,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
|
@ -125,7 +125,7 @@ func (m methodREQTailFile) getKind() Event {
|
|||
// as a new message.
|
||||
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
|
|
@ -20,7 +20,7 @@ func (m methodREQHttpGet) getKind() Event {
|
|||
// handler to do a Http Get.
|
||||
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
msgForErrors := message
|
||||
msgForErrors.FileName = msgForErrors.FileName + ".error"
|
||||
|
@ -129,7 +129,7 @@ func (m methodREQHttpGetScheduled) getKind() Event {
|
|||
// The second element of the MethodArgs slice holds the timer defined in seconds.
|
||||
func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
|
|
@ -102,21 +102,21 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
|||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// Check if the received hash is the same as the one currently active,
|
||||
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
||||
er := fmt.Errorf("info: methodREQKeysRequestUpdate: node %v and central have equal keys, nothing to do, exiting key update handler", message.FromNode)
|
||||
// proc.errorKernel.infoSend(proc, message, er)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
return
|
||||
}
|
||||
|
||||
er = fmt.Errorf("info: methodREQKeysRequestUpdate: node %v and central had not equal keys, preparing to send new version of keys", message.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
er = fmt.Errorf("info: methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
||||
|
||||
|
@ -125,7 +125,7 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
|||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
newReplyMessage(proc, message, b)
|
||||
}()
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
|||
}
|
||||
|
||||
er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// If the received map was empty we also want to delete all the locally stored keys,
|
||||
// else we copy the marshaled keysAndHash we received from central into our map.
|
||||
|
@ -324,7 +324,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
|
|||
func pushKeys(proc process, message Message, nodes []Node) error {
|
||||
er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes)
|
||||
var knh []byte
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
err := func() error {
|
||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||
|
@ -351,7 +351,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
// For all nodes that is not ack'ed we try to send an update once.
|
||||
for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap {
|
||||
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
msg := Message{
|
||||
ToNode: n,
|
||||
Method: REQKeysDeliverUpdate,
|
||||
|
@ -369,7 +369,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
er = fmt.Errorf("----> methodREQKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Create the data payload of the current allowed keys.
|
||||
|
@ -397,7 +397,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
// For all nodes that is ack'ed we try to send an update once.
|
||||
for n := range nodeMap {
|
||||
er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
msg := Message{
|
||||
ToNode: n,
|
||||
Method: REQKeysDeliverUpdate,
|
||||
|
@ -416,7 +416,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
er = fmt.Errorf("----> methodREQKeysAllow: sending keys update to node=%v", message.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -433,7 +433,7 @@ func (m methodREQKeysDelete) getKind() Event {
|
|||
|
||||
func (m methodREQKeysDelete) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
proc.errorKernel.logDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -463,13 +463,13 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
|
|||
|
||||
proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs)
|
||||
er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
// All new elements are now added, and we can create a new hash
|
||||
// representing the current keys in the allowed map.
|
||||
proc.centralAuth.updateHash(proc, message)
|
||||
er = fmt.Errorf(" * DEBUG updated hash for public keys")
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
|
||||
var nodes []Node
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
|
@ -90,7 +90,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
|
@ -142,7 +142,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file.
|
||||
|
@ -205,7 +205,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
proc.errorKernel.logDebug(er, proc.configuration)
|
||||
}
|
||||
|
||||
// Open file.
|
||||
|
|
16
server.go
16
server.go
|
@ -200,7 +200,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: creating subscribers data folder at %v", configuration.SubscribersDataFolder)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
|
@ -480,7 +480,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
}
|
||||
if ok && ctxCanceled {
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
delete(proc.processes.active.procNames, proc.processName)
|
||||
return false
|
||||
}
|
||||
|
@ -491,10 +491,10 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
select {
|
||||
case proc.subject.messageCh <- m:
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
}
|
||||
|
||||
return true
|
||||
|
@ -510,7 +510,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
}
|
||||
|
||||
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
|
||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||
var proc process
|
||||
|
@ -523,17 +523,17 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
|
||||
proc.spawnWorker()
|
||||
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
|
||||
// Now when the process is spawned we continue,
|
||||
// and send the message to that new process.
|
||||
select {
|
||||
case proc.subject.messageCh <- m:
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
|
||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||
s.errorKernel.logDebug(er, s.configuration)
|
||||
}
|
||||
|
||||
}(samDBVal)
|
||||
|
|
Loading…
Reference in a new issue