1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

moved all the procfunc definitions in the processes startup as separate functions defined after theyr respective request method where they belong

This commit is contained in:
postmannen 2024-12-06 11:03:53 +01:00
parent 3a31ced938
commit df07b926e2
7 changed files with 188 additions and 175 deletions

View file

@ -58,7 +58,7 @@ type process struct {
// procFunc's can also be used to wrap in other types which we want to
// work with. An example can be handling of metrics which the message
// have no notion of, but a procFunc can have that wrapped in from when it was constructed.
procFunc func(ctx context.Context, procFuncCh chan Message) error
procFunc func(ctx context.Context, proc process, procFuncCh chan Message) error
// The channel to send a messages to the procFunc go routine.
// This is typically used within the methodHandler for so we
// can pass messages between the procFunc and the handler.
@ -178,7 +178,7 @@ func (p process) startSubscriber() {
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
err := p.procFunc(p.ctx, p, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logError)

View file

@ -5,9 +5,6 @@ import (
"fmt"
"log"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// processes holds all the information about running processes
@ -114,42 +111,8 @@ func (p *processes) Start(proc process) {
}
if proc.configuration.StartProcesses.StartSubHello {
// subREQHello is the handler that is triggered when we are receiving a hello
// message. To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process,
// where the procFunc will receive messages from the handler when a message is
// received, the handler will deliver the message to the procFunc on the
// proc.procFuncCh, and we can then read that message from the procFuncCh in
// the procFunc running.
pf := func(ctx context.Context, procFuncCh chan Message) error {
// sayHelloNodes := make(map[Node]struct{})
for {
// Receive a copy of the message sent from the method handler.
var m Message
select {
case m = <-procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
proc.centralAuth.addPublicKey(proc, m)
// update the prometheus metrics
proc.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
proc.server.centralAuth.pki.nodesAcked.mu.Unlock()
proc.metrics.promHelloNodesTotal.Set(float64(mapLen))
proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
proc.startup.startProcess(proc, Hello, pf)
proc.startup.startProcess(proc, Hello, procFuncHello)
}
if proc.configuration.StartProcesses.IsCentralErrorLogger {
@ -165,128 +128,17 @@ func (p *processes) Start(proc process) {
}
if proc.configuration.StartProcesses.StartPubHello != 0 {
pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
// Send the ed25519 public key used for signing as the payload of the message.
d := proc.server.nodeAuth.SignPublicKey
m := Message{
FileName: "hello.log",
Directory: "hello-messages",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(d),
Method: Hello,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, HelloPublisher, pf)
proc.startup.startProcess(proc, HelloPublisher, procFuncHelloPublisher)
}
if proc.configuration.StartProcesses.EnableKeyUpdates {
// Define the startup of a publisher that will send KeysRequestUpdate
// to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate.
pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeysUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored keys,
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.publicKeys.mu.Lock()
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
p.errorKernel.logDebug(er)
m := Message{
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
Method: KeysRequestUpdate,
ReplyMethod: KeysDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.publicKeys.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, KeysRequestUpdate, pf)
proc.startup.startProcess(proc, KeysRequestUpdate, procFuncKeysRequestUpdate)
proc.startup.startProcess(proc, KeysDeliverUpdate, nil)
}
if proc.configuration.StartProcesses.EnableAclUpdates {
pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored acl's,
// so we would know for the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.nodeAcl.mu.Lock()
er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
p.errorKernel.logDebug(er)
m := Message{
FileName: "aclRequestUpdate.log",
Directory: "aclRequestUpdate",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]),
Method: AclRequestUpdate,
ReplyMethod: AclDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.nodeAcl.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
p.errorKernel.logDebug(er)
return nil
}
}
}
proc.startup.startProcess(proc, AclRequestUpdate, pf)
proc.startup.startProcess(proc, AclRequestUpdate, procFuncAclRequestUpdate)
proc.startup.startProcess(proc, AclDeliverUpdate, nil)
}
@ -353,7 +205,7 @@ func newStartup(server *server) *startup {
// startProcess will start a process. It takes the initial process, request method,
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, proc process, procFuncCh chan Message) error) {
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
p.errorKernel.logDebug(er)

View file

@ -188,6 +188,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
SUBCopySrc: HandlerFunc(methodSUB),
SUBCopyDst: HandlerFunc(methodSUB),
Hello: HandlerFunc(methodHello),
// The hello publisher will not subscribe for messages, it will
// only start a procFunc, so we we don't need a handler with a method,
// so we set it to nil.
HelloPublisher: HandlerFunc(nil),
ErrorLog: HandlerFunc(methodErrorLog),
HttpGet: HandlerFunc(methodHttpGet),

View file

@ -2,8 +2,10 @@ package ctrl
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/fxamacker/cbor/v2"
)
@ -88,6 +90,45 @@ func methodAclRequestUpdate(proc process, message Message, node string) ([]byte,
return nil, nil
}
func procFuncAclRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.AclUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored acl's,
// so we would know for the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.nodeAcl.mu.Lock()
er := fmt.Errorf(" ----> publisher AclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
proc.errorKernel.logDebug(er)
m := Message{
FileName: "aclRequestUpdate.log",
Directory: "aclRequestUpdate",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]),
Method: AclRequestUpdate,
ReplyMethod: AclDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.nodeAcl.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// ----
// Handler to receive the acls from a central server.

View file

@ -215,7 +215,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message)
copySrcSubProc.procFunc = copySrcSubProcFunc(cia, cancel, message)
// assign a handler to the sub process
copySrcSubProc.handler = copySrcSubHandler()
@ -335,7 +335,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel)
copyDstSubProc.procFunc = copyDstSubProcFunc(cia, message, cancel)
// assign a handler to the sub process
copyDstSubProc.handler = copyDstSubHandler()
@ -411,8 +411,8 @@ type copySubData struct {
Hash [32]byte
}
func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
func copySrcSubProcFunc(cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, process, chan Message) error {
pf := func(ctx context.Context, proc process, procFuncCh chan Message) error {
// Check if the realpath of the directory and filename specified in the
// message are of type unix socket, and if it is we do not add the extra
@ -626,9 +626,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
return pf
}
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error {
func copyDstSubProcFunc(cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
pf := func(ctx context.Context, proc process, procFuncCh chan Message) error {
csa := copySubData{
CopyStatus: copyReady,

View file

@ -2,8 +2,10 @@ package ctrl
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
)
// ---
@ -119,6 +121,48 @@ func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte
return nil, nil
}
// Define the startup of a publisher that will send KeysRequestUpdate
// to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate.
func procFuncKeysRequestUpdate(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.KeysUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored keys,
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.publicKeys.mu.Lock()
er := fmt.Errorf(" ----> publisher KeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
proc.errorKernel.logDebug(er)
m := Message{
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
Method: KeysRequestUpdate,
ReplyMethod: KeysDeliverUpdate,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.publicKeys.mu.Unlock()
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// ----
// Handler to receive the public keys from a central server.

View file

@ -1,11 +1,14 @@
package ctrl
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// -----
@ -56,6 +59,76 @@ func methodHello(proc process, message Message, node string) ([]byte, error) {
return ackMsg, nil
}
// procFuncHello is the procFunc used with the hello subscriber process.
// To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process,
// where the procFunc will receive messages from the handler when a message is
// received, the handler will deliver the message to the procFunc on the
// proc.procFuncCh, and we can then read that message from the procFuncCh in
// the procFunc running.
func procFuncHello(ctx context.Context, proc process, procFuncCh chan Message) error {
// sayHelloNodes := make(map[Node]struct{})
for {
// Receive a copy of the message sent from the method handler.
var m Message
select {
case m = <-procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
proc.centralAuth.addPublicKey(proc, m)
// update the prometheus metrics
proc.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
proc.server.centralAuth.pki.nodesAcked.mu.Unlock()
proc.metrics.promHelloNodesTotal.Set(float64(mapLen))
proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
func procFuncHelloPublisher(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.StartProcesses.StartPubHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
// Send the ed25519 public key used for signing as the payload of the message.
d := proc.server.nodeAuth.SignPublicKey
m := Message{
FileName: "hello.log",
Directory: "hello-messages",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(d),
Method: Hello,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
proc.errorKernel.logDebug(er)
return nil
}
}
}
// ---
// Handle the writing of error logs.