diff --git a/process.go b/process.go index 03191f6..58f878b 100644 --- a/process.go +++ b/process.go @@ -58,7 +58,7 @@ type process struct { // procFunc's can also be used to wrap in other types which we want to // work with. An example can be handling of metrics which the message // have no notion of, but a procFunc can have that wrapped in from when it was constructed. - procFunc func(ctx context.Context, procFuncCh chan Message) error + procFunc func(ctx context.Context, proc process, procFuncCh chan Message) error // The channel to send a messages to the procFunc go routine. // This is typically used within the methodHandler for so we // can pass messages between the procFunc and the handler. @@ -178,7 +178,7 @@ func (p process) startSubscriber() { // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { - err := p.procFunc(p.ctx, p.procFuncCh) + err := p.procFunc(p.ctx, p, p.procFuncCh) if err != nil { er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) p.errorKernel.errSend(p, Message{}, er, logError) diff --git a/processes.go b/processes.go index 3e738da..1182b08 100644 --- a/processes.go +++ b/processes.go @@ -5,9 +5,6 @@ import ( "fmt" "log" "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" ) // processes holds all the information about running processes @@ -114,42 +111,8 @@ func (p *processes) Start(proc process) { } if proc.configuration.StartProcesses.StartSubHello { - // subREQHello is the handler that is triggered when we are receiving a hello - // message. To keep the state of all the hello's received from nodes we need - // to also start a procFunc that will live as a go routine tied to this process, - // where the procFunc will receive messages from the handler when a message is - // received, the handler will deliver the message to the procFunc on the - // proc.procFuncCh, and we can then read that message from the procFuncCh in - // the procFunc running. - pf := func(ctx context.Context, procFuncCh chan Message) error { - // sayHelloNodes := make(map[Node]struct{}) - for { - // Receive a copy of the message sent from the method handler. - var m Message - - select { - case m = <-procFuncCh: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name()) - // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er) - return nil - } - - proc.centralAuth.addPublicKey(proc, m) - - // update the prometheus metrics - - proc.server.centralAuth.pki.nodesAcked.mu.Lock() - mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys) - proc.server.centralAuth.pki.nodesAcked.mu.Unlock() - proc.metrics.promHelloNodesTotal.Set(float64(mapLen)) - proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() - - } - } - proc.startup.startProcess(proc, Hello, pf) + proc.startup.startProcess(proc, Hello, procFuncHello) } if proc.configuration.StartProcesses.IsCentralErrorLogger { @@ -165,128 +128,17 @@ func (p *processes) Start(proc process) { } if proc.configuration.StartProcesses.StartPubHello != 0 { - pf := func(ctx context.Context, procFuncCh chan Message) error { - - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello)) - defer ticker.Stop() - for { - - // d := fmt.Sprintf("Hello from %v\n", p.node) - // Send the ed25519 public key used for signing as the payload of the message. - d := proc.server.nodeAuth.SignPublicKey - - m := Message{ - FileName: "hello.log", - Directory: "hello-messages", - ToNode: Node(p.configuration.CentralNodeName), - FromNode: Node(proc.node), - Data: []byte(d), - Method: Hello, - ACKTimeout: proc.configuration.DefaultMessageTimeout, - Retries: 1, - } - - proc.newMessagesCh <- m - - select { - case <-ticker.C: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) - // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er) - return nil - } - } - } - proc.startup.startProcess(proc, HelloPublisher, pf) + proc.startup.startProcess(proc, HelloPublisher, procFuncHelloPublisher) } if proc.configuration.StartProcesses.EnableKeyUpdates { - // Define the startup of a publisher that will send KeysRequestUpdate - // to central server and ask for publics keys, and to get them deliver back with a request - // of type KeysDeliverUpdate. - pf := func(ctx context.Context, procFuncCh chan Message) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeysUpdateInterval)) - defer ticker.Stop() - for { - - // Send a message with the hash of the currently stored keys, - // so we would know on the subscriber at central if it should send - // and update with new keys back. - - proc.nodeAuth.publicKeys.mu.Lock() - er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) - p.errorKernel.logDebug(er) - - m := Message{ - FileName: "publickeysget.log", - Directory: "publickeysget", - ToNode: Node(p.configuration.CentralNodeName), - FromNode: Node(proc.node), - Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]), - Method: KeysRequestUpdate, - ReplyMethod: KeysDeliverUpdate, - ACKTimeout: proc.configuration.DefaultMessageTimeout, - Retries: 1, - } - proc.nodeAuth.publicKeys.mu.Unlock() - - proc.newMessagesCh <- m - - select { - case <-ticker.C: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) - // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er) - return nil - } - } - } - proc.startup.startProcess(proc, KeysRequestUpdate, pf) + proc.startup.startProcess(proc, KeysRequestUpdate, procFuncKeysRequestUpdate) proc.startup.startProcess(proc, KeysDeliverUpdate, nil) } if proc.configuration.StartProcesses.EnableAclUpdates { - pf := func(ctx context.Context, procFuncCh chan Message) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval)) - defer ticker.Stop() - for { - // Send a message with the hash of the currently stored acl's, - // so we would know for the subscriber at central if it should send - // and update with new keys back. - - proc.nodeAuth.nodeAcl.mu.Lock() - er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:])) - p.errorKernel.logDebug(er) - - m := Message{ - FileName: "aclRequestUpdate.log", - Directory: "aclRequestUpdate", - ToNode: Node(p.configuration.CentralNodeName), - FromNode: Node(proc.node), - Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]), - Method: AclRequestUpdate, - ReplyMethod: AclDeliverUpdate, - ACKTimeout: proc.configuration.DefaultMessageTimeout, - Retries: 1, - } - proc.nodeAuth.nodeAcl.mu.Unlock() - - proc.newMessagesCh <- m - - select { - case <-ticker.C: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) - // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er) - return nil - } - } - } - proc.startup.startProcess(proc, AclRequestUpdate, pf) + proc.startup.startProcess(proc, AclRequestUpdate, procFuncAclRequestUpdate) proc.startup.startProcess(proc, AclDeliverUpdate, nil) } @@ -353,7 +205,7 @@ func newStartup(server *server) *startup { // startProcess will start a process. It takes the initial process, request method, // and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. -func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { +func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, proc process, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v subscriber: %#v", m, p.node) p.errorKernel.logDebug(er) diff --git a/requests.go b/requests.go index 21a272e..e4b8d24 100644 --- a/requests.go +++ b/requests.go @@ -174,20 +174,23 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { ma := MethodsAvailable{ Methodhandlers: map[Method]HandlerFunc{ - Initial: HandlerFunc(methodInitial), - OpProcessList: HandlerFunc(methodOpProcessList), - OpProcessStart: HandlerFunc(methodOpProcessStart), - OpProcessStop: HandlerFunc(methodOpProcessStop), - CliCommand: HandlerFunc(methodCliCommand), - CliCommandCont: HandlerFunc(methodCliCommandCont), - Console: HandlerFunc(methodConsole), - FileAppend: HandlerFunc(methodFileAppend), - File: HandlerFunc(methodToFile), - CopySrc: HandlerFunc(methodCopySrc), - CopyDst: HandlerFunc(methodCopyDst), - SUBCopySrc: HandlerFunc(methodSUB), - SUBCopyDst: HandlerFunc(methodSUB), - Hello: HandlerFunc(methodHello), + Initial: HandlerFunc(methodInitial), + OpProcessList: HandlerFunc(methodOpProcessList), + OpProcessStart: HandlerFunc(methodOpProcessStart), + OpProcessStop: HandlerFunc(methodOpProcessStop), + CliCommand: HandlerFunc(methodCliCommand), + CliCommandCont: HandlerFunc(methodCliCommandCont), + Console: HandlerFunc(methodConsole), + FileAppend: HandlerFunc(methodFileAppend), + File: HandlerFunc(methodToFile), + CopySrc: HandlerFunc(methodCopySrc), + CopyDst: HandlerFunc(methodCopyDst), + SUBCopySrc: HandlerFunc(methodSUB), + SUBCopyDst: HandlerFunc(methodSUB), + Hello: HandlerFunc(methodHello), + // The hello publisher will not subscribe for messages, it will + // only start a procFunc, so we we don't need a handler with a method, + // so we set it to nil. HelloPublisher: HandlerFunc(nil), ErrorLog: HandlerFunc(methodErrorLog), HttpGet: HandlerFunc(methodHttpGet), diff --git a/requests_acl.go b/requests_acl.go index f91ab9b..826e7c5 100644 --- a/requests_acl.go +++ b/requests_acl.go @@ -2,8 +2,10 @@ package ctrl import ( "bytes" + "context" "encoding/json" "fmt" + "time" "github.com/fxamacker/cbor/v2" ) @@ -88,6 +90,45 @@ func methodAclRequestUpdate(proc process, message Message, node string) ([]byte, return nil, nil } +func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error { + ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.AclUpdateInterval)) + defer ticker.Stop() + for { + + // Send a message with the hash of the currently stored acl's, + // so we would know for the subscriber at central if it should send + // and update with new keys back. + + proc.nodeAuth.nodeAcl.mu.Lock() + er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:])) + proc.errorKernel.logDebug(er) + + m := Message{ + FileName: "aclRequestUpdate.log", + Directory: "aclRequestUpdate", + ToNode: Node(proc.configuration.CentralNodeName), + FromNode: Node(proc.node), + Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]), + Method: AclRequestUpdate, + ReplyMethod: AclDeliverUpdate, + ACKTimeout: proc.configuration.DefaultMessageTimeout, + Retries: 1, + } + proc.nodeAuth.nodeAcl.mu.Unlock() + + proc.newMessagesCh <- m + + select { + case <-ticker.C: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + proc.errorKernel.logDebug(er) + return nil + } + } +} + // ---- // Handler to receive the acls from a central server. diff --git a/requests_copy.go b/requests_copy.go index b7c304d..40f1ef6 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -215,7 +215,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message) + copySrcSubProc.procFunc = copySrcSubProcFunc(cia, cancel, message) // assign a handler to the sub process copySrcSubProc.handler = copySrcSubHandler() @@ -335,7 +335,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel) + copyDstSubProc.procFunc = copyDstSubProcFunc(cia, message, cancel) // assign a handler to the sub process copyDstSubProc.handler = copyDstSubHandler() @@ -411,8 +411,8 @@ type copySubData struct { Hash [32]byte } -func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, chan Message) error { - pf := func(ctx context.Context, procFuncCh chan Message) error { +func copySrcSubProcFunc(cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, process, chan Message) error { + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { // Check if the realpath of the directory and filename specified in the // message are of type unix socket, and if it is we do not add the extra @@ -626,9 +626,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel return pf } -func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error { +func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { - pf := func(ctx context.Context, procFuncCh chan Message) error { + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { csa := copySubData{ CopyStatus: copyReady, diff --git a/requests_keys.go b/requests_keys.go index fea983a..d4a7f68 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -2,8 +2,10 @@ package ctrl import ( "bytes" + "context" "encoding/json" "fmt" + "time" ) // --- @@ -119,6 +121,48 @@ func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte return nil, nil } +// Define the startup of a publisher that will send KeysRequestUpdate +// to central server and ask for publics keys, and to get them deliver back with a request +// of type KeysDeliverUpdate. +func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error { + ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.KeysUpdateInterval)) + defer ticker.Stop() + for { + + // Send a message with the hash of the currently stored keys, + // so we would know on the subscriber at central if it should send + // and update with new keys back. + + proc.nodeAuth.publicKeys.mu.Lock() + er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) + proc.errorKernel.logDebug(er) + + m := Message{ + FileName: "publickeysget.log", + Directory: "publickeysget", + ToNode: Node(proc.configuration.CentralNodeName), + FromNode: Node(proc.node), + Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]), + Method: KeysRequestUpdate, + ReplyMethod: KeysDeliverUpdate, + ACKTimeout: proc.configuration.DefaultMessageTimeout, + Retries: 1, + } + proc.nodeAuth.publicKeys.mu.Unlock() + + proc.newMessagesCh <- m + + select { + case <-ticker.C: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + proc.errorKernel.logDebug(er) + return nil + } + } +} + // ---- // Handler to receive the public keys from a central server. diff --git a/requests_std.go b/requests_std.go index 88a6c3f..2b49e69 100644 --- a/requests_std.go +++ b/requests_std.go @@ -1,11 +1,14 @@ package ctrl import ( + "context" "fmt" "log" "os" "path/filepath" "time" + + "github.com/prometheus/client_golang/prometheus" ) // ----- @@ -56,6 +59,76 @@ func methodHello(proc process, message Message, node string) ([]byte, error) { return ackMsg, nil } +// procFuncHello is the procFunc used with the hello subscriber process. +// To keep the state of all the hello's received from nodes we need +// to also start a procFunc that will live as a go routine tied to this process, +// where the procFunc will receive messages from the handler when a message is +// received, the handler will deliver the message to the procFunc on the +// proc.procFuncCh, and we can then read that message from the procFuncCh in +// the procFunc running. +func procFuncHello(ctx context.Context, proc process, procFuncCh chan Message) error { + // sayHelloNodes := make(map[Node]struct{}) + + for { + // Receive a copy of the message sent from the method handler. + var m Message + + select { + case m = <-procFuncCh: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + proc.errorKernel.logDebug(er) + return nil + } + + proc.centralAuth.addPublicKey(proc, m) + + // update the prometheus metrics + + proc.server.centralAuth.pki.nodesAcked.mu.Lock() + mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys) + proc.server.centralAuth.pki.nodesAcked.mu.Unlock() + proc.metrics.promHelloNodesTotal.Set(float64(mapLen)) + proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() + + } +} + +func procFuncHelloPublisher(ctx context.Context, proc process, procFuncCh chan Message) error { + + ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.StartProcesses.StartPubHello)) + defer ticker.Stop() + for { + + // d := fmt.Sprintf("Hello from %v\n", p.node) + // Send the ed25519 public key used for signing as the payload of the message. + d := proc.server.nodeAuth.SignPublicKey + + m := Message{ + FileName: "hello.log", + Directory: "hello-messages", + ToNode: Node(proc.configuration.CentralNodeName), + FromNode: Node(proc.node), + Data: []byte(d), + Method: Hello, + ACKTimeout: proc.configuration.DefaultMessageTimeout, + Retries: 1, + } + + proc.newMessagesCh <- m + + select { + case <-ticker.C: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + proc.errorKernel.logDebug(er) + return nil + } + } +} + // --- // Handle the writing of error logs.