1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

moved all the procfunc definitions in the processes startup as separate functions defined after theyr respective request method where they belong

This commit is contained in:
postmannen 2024-12-13 16:49:21 +01:00
parent 3a31ced938
commit f99af8e0ab
7 changed files with 188 additions and 175 deletions

View file

@ -58,7 +58,7 @@ type process struct {
// procFunc's can also be used to wrap in other types which we want to // procFunc's can also be used to wrap in other types which we want to
// work with. An example can be handling of metrics which the message // work with. An example can be handling of metrics which the message
// have no notion of, but a procFunc can have that wrapped in from when it was constructed. // have no notion of, but a procFunc can have that wrapped in from when it was constructed.
procFunc func(ctx context.Context, procFuncCh chan Message) error procFunc func(ctx context.Context, proc process, procFuncCh chan Message) error
// The channel to send a messages to the procFunc go routine. // The channel to send a messages to the procFunc go routine.
// This is typically used within the methodHandler for so we // This is typically used within the methodHandler for so we
// can pass messages between the procFunc and the handler. // can pass messages between the procFunc and the handler.
@ -178,7 +178,7 @@ func (p process) startSubscriber() {
// Start the procFunc in it's own anonymous func so we are able // Start the procFunc in it's own anonymous func so we are able
// to get the return error. // to get the return error.
go func() { go func() {
err := p.procFunc(p.ctx, p.procFuncCh) err := p.procFunc(p.ctx, p, p.procFuncCh)
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logError) p.errorKernel.errSend(p, Message{}, er, logError)

View file

@ -5,9 +5,6 @@ import (
"fmt" "fmt"
"log" "log"
"sync" "sync"
"time"
"github.com/prometheus/client_golang/prometheus"
) )
// processes holds all the information about running processes // processes holds all the information about running processes
@ -114,42 +111,8 @@ func (p *processes) Start(proc process) {
} }
if proc.configuration.StartProcesses.StartSubHello { if proc.configuration.StartProcesses.StartSubHello {
// subREQHello is the handler that is triggered when we are receiving a hello
// message. To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process,
// where the procFunc will receive messages from the handler when a message is
// received, the handler will deliver the message to the procFunc on the
// proc.procFuncCh, and we can then read that message from the procFuncCh in
// the procFunc running.
pf := func(ctx context.Context, procFuncCh chan Message) error {
// sayHelloNodes := make(map[Node]struct{})
for { proc.startup.startProcess(proc, Hello, procFuncHello)
// Receive a copy of the message sent from the method handler.
var m Message
select {
case m = <-procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
proc.centralAuth.addPublicKey(proc, m)
// update the prometheus metrics
proc.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
proc.server.centralAuth.pki.nodesAcked.mu.Unlock()
proc.metrics.promHelloNodesTotal.Set(float64(mapLen))
proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
proc.startup.startProcess(proc, Hello, pf)
} }
if proc.configuration.StartProcesses.IsCentralErrorLogger { if proc.configuration.StartProcesses.IsCentralErrorLogger {
@ -165,128 +128,17 @@ func (p *processes) Start(proc process) {
} }
if proc.configuration.StartProcesses.StartPubHello != 0 { if proc.configuration.StartProcesses.StartPubHello != 0 {
pf := func(ctx context.Context, procFuncCh chan Message) error { proc.startup.startProcess(proc, HelloPublisher, procFuncHelloPublisher)
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
// Send the ed25519 public key used for signing as the payload of the message.
d := proc.server.nodeAuth.SignPublicKey
m := Message{
FileName: "hello.log",
Directory: "hello-messages",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(d),
Method: Hello,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, HelloPublisher, pf)
} }
if proc.configuration.StartProcesses.EnableKeyUpdates { if proc.configuration.StartProcesses.EnableKeyUpdates {
// Define the startup of a publisher that will send KeysRequestUpdate proc.startup.startProcess(proc, KeysRequestUpdate, procFuncKeysRequestUpdate)
// to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate.
pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeysUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored keys,
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.publicKeys.mu.Lock()
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
p.errorKernel.logDebug(er)
m := Message{
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
Method: KeysRequestUpdate,
ReplyMethod: KeysDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.publicKeys.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, KeysRequestUpdate, pf)
proc.startup.startProcess(proc, KeysDeliverUpdate, nil) proc.startup.startProcess(proc, KeysDeliverUpdate, nil)
} }
if proc.configuration.StartProcesses.EnableAclUpdates { if proc.configuration.StartProcesses.EnableAclUpdates {
pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored acl's, proc.startup.startProcess(proc, AclRequestUpdate, procFuncAclRequestUpdate)
// so we would know for the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.nodeAcl.mu.Lock()
er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
p.errorKernel.logDebug(er)
m := Message{
FileName: "aclRequestUpdate.log",
Directory: "aclRequestUpdate",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]),
Method: AclRequestUpdate,
ReplyMethod: AclDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.nodeAcl.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, AclRequestUpdate, pf)
proc.startup.startProcess(proc, AclDeliverUpdate, nil) proc.startup.startProcess(proc, AclDeliverUpdate, nil)
} }
@ -353,7 +205,7 @@ func newStartup(server *server) *startup {
// startProcess will start a process. It takes the initial process, request method, // startProcess will start a process. It takes the initial process, request method,
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. // and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, proc process, procFuncCh chan Message) error) {
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node) er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)

View file

@ -188,6 +188,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
SUBCopySrc: HandlerFunc(methodSUB), SUBCopySrc: HandlerFunc(methodSUB),
SUBCopyDst: HandlerFunc(methodSUB), SUBCopyDst: HandlerFunc(methodSUB),
Hello: HandlerFunc(methodHello), Hello: HandlerFunc(methodHello),
// The hello publisher will not subscribe for messages, it will
// only start a procFunc, so we we don't need a handler with a method,
// so we set it to nil.
HelloPublisher: HandlerFunc(nil), HelloPublisher: HandlerFunc(nil),
ErrorLog: HandlerFunc(methodErrorLog), ErrorLog: HandlerFunc(methodErrorLog),
HttpGet: HandlerFunc(methodHttpGet), HttpGet: HandlerFunc(methodHttpGet),

View file

@ -2,8 +2,10 @@ package ctrl
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
) )
@ -88,6 +90,45 @@ func methodAclRequestUpdate(proc process, message Message, node string) ([]byte,
return nil, nil return nil, nil
} }
func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.AclUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored acl's,
// so we would know for the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.nodeAcl.mu.Lock()
er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
proc.errorKernel.logDebug(er)
m := Message{
FileName: "aclRequestUpdate.log",
Directory: "aclRequestUpdate",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]),
Method: AclRequestUpdate,
ReplyMethod: AclDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.nodeAcl.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// ---- // ----
// Handler to receive the acls from a central server. // Handler to receive the acls from a central server.

View file

@ -215,7 +215,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message) copySrcSubProc.procFunc = copySrcSubProcFunc(cia, cancel, message)
// assign a handler to the sub process // assign a handler to the sub process
copySrcSubProc.handler = copySrcSubHandler() copySrcSubProc.handler = copySrcSubHandler()
@ -335,7 +335,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel) copyDstSubProc.procFunc = copyDstSubProcFunc(cia, message, cancel)
// assign a handler to the sub process // assign a handler to the sub process
copyDstSubProc.handler = copyDstSubHandler() copyDstSubProc.handler = copyDstSubHandler()
@ -411,8 +411,8 @@ type copySubData struct {
Hash [32]byte Hash [32]byte
} }
func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, chan Message) error { func copySrcSubProcFunc(cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, process, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, proc process, procFuncCh chan Message) error {
// Check if the realpath of the directory and filename specified in the // Check if the realpath of the directory and filename specified in the
// message are of type unix socket, and if it is we do not add the extra // message are of type unix socket, and if it is we do not add the extra
@ -626,9 +626,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
return pf return pf
} }
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error { func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, proc process, procFuncCh chan Message) error {
csa := copySubData{ csa := copySubData{
CopyStatus: copyReady, CopyStatus: copyReady,

View file

@ -2,8 +2,10 @@ package ctrl
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
) )
// --- // ---
@ -119,6 +121,48 @@ func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte
return nil, nil return nil, nil
} }
// Define the startup of a publisher that will send KeysRequestUpdate
// to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate.
func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.KeysUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored keys,
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.publicKeys.mu.Lock()
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
proc.errorKernel.logDebug(er)
m := Message{
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
Method: KeysRequestUpdate,
ReplyMethod: KeysDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.publicKeys.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// ---- // ----
// Handler to receive the public keys from a central server. // Handler to receive the public keys from a central server.

View file

@ -1,11 +1,14 @@
package ctrl package ctrl
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
) )
// ----- // -----
@ -56,6 +59,76 @@ func methodHello(proc process, message Message, node string) ([]byte, error) {
return ackMsg, nil return ackMsg, nil
} }
// procFuncHello is the procFunc used with the hello subscriber process.
// To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process,
// where the procFunc will receive messages from the handler when a message is
// received, the handler will deliver the message to the procFunc on the
// proc.procFuncCh, and we can then read that message from the procFuncCh in
// the procFunc running.
func procFuncHello(ctx context.Context, proc process, procFuncCh chan Message) error {
// sayHelloNodes := make(map[Node]struct{})
for {
// Receive a copy of the message sent from the method handler.
var m Message
select {
case m = <-procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
proc.centralAuth.addPublicKey(proc, m)
// update the prometheus metrics
proc.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
proc.server.centralAuth.pki.nodesAcked.mu.Unlock()
proc.metrics.promHelloNodesTotal.Set(float64(mapLen))
proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
func procFuncHelloPublisher(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.StartProcesses.StartPubHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
// Send the ed25519 public key used for signing as the payload of the message.
d := proc.server.nodeAuth.SignPublicKey
m := Message{
FileName: "hello.log",
Directory: "hello-messages",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(d),
Method: Hello,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// --- // ---
// Handle the writing of error logs. // Handle the writing of error logs.