1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

removed processKind, og removed not needed file check in copy request

This commit is contained in:
postmannen 2024-12-01 22:17:58 +01:00
parent 93ad86a150
commit a83029c343
6 changed files with 22 additions and 112 deletions

View file

@ -570,16 +570,6 @@ func (s *server) readHttpListener() {
}() }()
} }
// The subject are made up of different parts of the message field.
// To make things easier and to avoid figuring out what the subject
// is in all places we've created the concept of subjectAndMessage
// (sam) where we get the subject for the message once, and use the
// sam structure with subject alongside the message instead.
type subjectAndMessage struct {
Subject `json:"subject" yaml:"subject"`
Message `json:"message" yaml:"message"`
}
// convertBytesToSAMs will range over the byte representing a message given in // convertBytesToSAMs will range over the byte representing a message given in
// json format. For each element found the Message type will be converted into // json format. For each element found the Message type will be converted into
// a SubjectAndMessage type value and appended to a slice, and the slice is // a SubjectAndMessage type value and appended to a slice, and the slice is
@ -640,36 +630,3 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
return msgs return msgs
} }
// newSubjectAndMessage will look up the correct values and value types to
// be used in a subject for a Message (sam), and return the a combined structure
// of type subjectAndMessage.
func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
tmpH := mt.getHandler(m.Method)
if tmpH == nil {
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method)
}
switch {
case m.ToNode == "":
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m)
case m.Method == "":
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m)
}
sub := Subject{
ToNode: string(m.ToNode),
Method: m.Method,
}
sam := subjectAndMessage{
Subject: sub,
Message: m,
}
return sam, nil
}

View file

@ -13,25 +13,11 @@ import (
// "google.golang.org/protobuf/internal/errors" // "google.golang.org/protobuf/internal/errors"
) )
// processKind are either kindSubscriber or kindPublisher, and are
// used to distinguish the kind of process to spawn and to know
// the process kind put in the process map.
type processKind string
const (
processKindSubscriber processKind = "subscriber"
processKindPublisher processKind = "publisher"
)
// process holds all the logic to handle a message type and it's // process holds all the logic to handle a message type and it's
// method, subscription/publishin messages for a subject, and more. // method, subscription/publishin messages for a subject, and more.
type process struct { type process struct {
// isSubProcess is used to indentify subprocesses spawned by other processes. // isSubProcess is used to indentify subprocesses spawned by other processes.
isSubProcess bool isSubProcess bool
// isLongRunningPublisher is set to true for a publisher service that should not
// be auto terminated like a normal autospawned publisher would be when the the
// inactivity timeout have expired
isLongRunningPublisher bool
// server // server
server *server server *server
// messageID // messageID
@ -43,8 +29,7 @@ type process struct {
// Put a node here to be able know the node a process is at. // Put a node here to be able know the node a process is at.
node Node node Node
// The processID for the current process // The processID for the current process
processID int processID int
processKind processKind
// methodsAvailable // methodsAvailable
methodsAvailable MethodsAvailable methodsAvailable MethodsAvailable
// procFunc is a function that will be started when a worker process // procFunc is a function that will be started when a worker process
@ -117,7 +102,7 @@ type process struct {
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { func newProcess(ctx context.Context, server *server, subject Subject) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
server.processes.mu.Lock() server.processes.mu.Lock()
server.processes.lastProcessID++ server.processes.lastProcessID++
@ -134,7 +119,6 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
subject: subject, subject: subject,
node: Node(server.configuration.NodeName), node: Node(server.configuration.NodeName),
processID: pid, processID: pid,
processKind: processKind,
methodsAvailable: method.GetMethodsAvailable(), methodsAvailable: method.GetMethodsAvailable(),
newMessagesCh: server.newMessagesCh, newMessagesCh: server.newMessagesCh,
configuration: server.configuration, configuration: server.configuration,
@ -152,12 +136,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
// We use the full name of the subject to identify a unique // We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle // process. We can do that since a process can only handle
// one request type. // one request type.
if proc.processKind == processKindPublisher { proc.processName = processNameGet(proc.subject.name())
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
}
if proc.processKind == processKindSubscriber {
proc.processName = processNameGet(proc.subject.name(), processKindSubscriber)
}
return proc return proc
} }
@ -178,9 +157,7 @@ func (p process) start() {
// Start a subscriber worker, which will start a go routine (process) // Start a subscriber worker, which will start a go routine (process)
// to handle executing the request method defined in the message. // to handle executing the request method defined in the message.
if p.processKind == processKindSubscriber { p.startSubscriber()
p.startSubscriber()
}
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
p.processes.active.mu.Lock() p.processes.active.mu.Lock()
@ -711,7 +688,7 @@ func (p process) publishAMessage(m Message, natsConn *nats.Conn) {
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name())
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
p.messageID++ p.messageID++

View file

@ -200,7 +200,7 @@ func (p *processes) Start(proc process) {
} }
} }
} }
proc.startup.publisher(proc, Hello, pf) proc.startup.subscriber(proc, Hello, pf)
} }
if proc.configuration.StartProcesses.EnableKeyUpdates { if proc.configuration.StartProcesses.EnableKeyUpdates {
@ -245,7 +245,7 @@ func (p *processes) Start(proc process) {
} }
} }
} }
proc.startup.publisher(proc, KeysRequestUpdate, pf) proc.startup.subscriber(proc, KeysRequestUpdate, pf)
proc.startup.subscriber(proc, KeysDeliverUpdate, nil) proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
} }
@ -288,7 +288,7 @@ func (p *processes) Start(proc process) {
} }
} }
} }
proc.startup.publisher(proc, AclRequestUpdate, pf) proc.startup.subscriber(proc, AclRequestUpdate, pf)
proc.startup.subscriber(proc, AclDeliverUpdate, nil) proc.startup.subscriber(proc, AclDeliverUpdate, nil)
} }
@ -368,25 +368,12 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
} }
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) proc := newProcess(p.ctx, p.processes.server, sub)
proc.procFunc = pf proc.procFunc = pf
go proc.start() go proc.start()
} }
// publisher will start a publisher process. It takes the initial process, request method,
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
p.errorKernel.logDebug(er)
sub := newSubject(m, string(p.node))
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher)
proc.procFunc = pf
proc.isLongRunningPublisher = true
go proc.start()
}
// --------------------------------------------------------------- // ---------------------------------------------------------------
// Print the content of the processes map. // Print the content of the processes map.
@ -398,7 +385,7 @@ func (p *processes) printProcessesMap() {
p.active.mu.Lock() p.active.mu.Lock()
for pName, proc := range p.active.procNames { for pName, proc := range p.active.procNames {
er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name()) er := fmt.Errorf("info: proc - procName in map: %v , id: %v, subject: %v", pName, proc.processID, proc.subject.name())
proc.errorKernel.logDebug(er) proc.errorKernel.logDebug(er)
} }

View file

@ -92,15 +92,6 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
} }
// Check if the filepaths for the socket a realpaths.
file := filepath.Join(message.Directory, message.FileName)
if strings.HasPrefix(file, "./") || !strings.HasPrefix(file, "/") {
er := fmt.Errorf("error: copySrcSubHandler: path in message started with ./ or no directory at all, only full paths are allowed, path given was : %v", file)
proc.errorKernel.errSend(proc, message, er, logError)
newReplyMessage(proc, message, []byte(er.Error()))
return nil, er
}
var subProcessName string var subProcessName string
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
@ -220,7 +211,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) copySrcSubProc := newSubProcess(ctx, proc.server, sub)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
@ -268,8 +259,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
} }
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. // newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { func newSubProcess(ctx context.Context, server *server, subject Subject) process {
p := newProcess(ctx, server, subject, processKind) p := newProcess(ctx, server, subject)
p.isSubProcess = true p.isSubProcess = true
return p return p
@ -321,7 +312,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
// previous message is then fully up and running, so we just discard // previous message is then fully up and running, so we just discard
// that second message in those cases. // that second message in those cases.
pn := processNameGet(sub.name(), processKindSubscriber) pn := processNameGet(sub.name())
// fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn)
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
@ -340,7 +331,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
} }
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) copyDstSubProc := newSubProcess(ctx, proc.server, sub)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.

View file

@ -23,7 +23,7 @@ func methodOpProcessList(proc process, message Message, node string) ([]byte, er
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
for _, pTmp := range proc.processes.active.procNames { for _, pTmp := range proc.processes.active.procNames {
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name()) s := fmt.Sprintf("%v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processID, pTmp.subject.name())
sb := []byte(s) sb := []byte(s)
out = append(out, sb...) out = append(out, sb...)
@ -68,7 +68,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
// Create the process and start it. // Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName) sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) procNew := newProcess(proc.ctx, proc.server, sub)
go procNew.start() go procNew.start()
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
@ -115,7 +115,6 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er
methodString := message.MethodArgs[0] methodString := message.MethodArgs[0]
node := message.MethodArgs[1] node := message.MethodArgs[1]
kind := message.MethodArgs[2]
method := Method(methodString) method := Method(methodString)
tmpH := mt.getHandler(Method(method)) tmpH := mt.getHandler(Method(method))
@ -132,7 +131,7 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er
// We can then use this processName to get the real values for the // We can then use this processName to get the real values for the
// actual process we want to stop. // actual process we want to stop.
sub := newSubject(method, string(node)) sub := newSubject(method, string(node))
processName := processNameGet(sub.name(), processKind(kind)) processName := processNameGet(sub.name())
// Remove the process from the processes active map if found. // Remove the process from the processes active map if found.
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()

View file

@ -23,9 +23,8 @@ import (
type processName string type processName string
// Will return a process name made up of subjectName+processKind // Will return a process name made up of subjectName+processKind
func processNameGet(sn subjectName, pk processKind) processName { func processNameGet(sn subjectName) processName {
pn := fmt.Sprintf("%s_%s", sn, pk) return processName(sn)
return processName(pn)
} }
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
@ -356,7 +355,7 @@ func (s *server) Start() {
// //
// The context of the initial process are set in processes.Start. // The context of the initial process are set in processes.Start.
sub := newSubject(Initial, s.nodeName) sub := newSubject(Initial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s, sub, "") s.processInitial = newProcess(context.TODO(), s, sub)
// Start all wanted subscriber processes. // Start all wanted subscriber processes.
s.processes.Start(s.processInitial) s.processes.Start(s.processInitial)
@ -440,7 +439,7 @@ func (s *server) directSAMSChRead() {
for i := range messages { for i := range messages {
// TODO: !!!!!! Shoud the node here be the fromNode ??????? // TODO: !!!!!! Shoud the node here be the fromNode ???????
subject := newSubject(messages[i].Method, string(messages[i].ToNode)) subject := newSubject(messages[i].Method, string(messages[i].ToNode))
processName := processNameGet(subject.name(), processKindSubscriber) processName := processNameGet(subject.name())
s.processes.active.mu.Lock() s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName] p := s.processes.active.procNames[processName]