From a83029c34310de8f4fa04f301b24a66ce5c4e7fb Mon Sep 17 00:00:00 2001 From: postmannen Date: Sun, 1 Dec 2024 22:17:58 +0100 Subject: [PATCH] removed processKind, og removed not needed file check in copy request --- message_readers.go | 43 ------------------------------------------- process.go | 33 +++++---------------------------- processes.go | 23 +++++------------------ requests_copy.go | 19 +++++-------------- requests_operator.go | 7 +++---- server.go | 9 ++++----- 6 files changed, 22 insertions(+), 112 deletions(-) diff --git a/message_readers.go b/message_readers.go index 2948124..0607c1e 100644 --- a/message_readers.go +++ b/message_readers.go @@ -570,16 +570,6 @@ func (s *server) readHttpListener() { }() } -// The subject are made up of different parts of the message field. -// To make things easier and to avoid figuring out what the subject -// is in all places we've created the concept of subjectAndMessage -// (sam) where we get the subject for the message once, and use the -// sam structure with subject alongside the message instead. -type subjectAndMessage struct { - Subject `json:"subject" yaml:"subject"` - Message `json:"message" yaml:"message"` -} - // convertBytesToSAMs will range over the byte representing a message given in // json format. For each element found the Message type will be converted into // a SubjectAndMessage type value and appended to a slice, and the slice is @@ -640,36 +630,3 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message { return msgs } - -// newSubjectAndMessage will look up the correct values and value types to -// be used in a subject for a Message (sam), and return the a combined structure -// of type subjectAndMessage. -func newSubjectAndMessage(m Message) (subjectAndMessage, error) { - // We need to create a tempory method type to look up the kind for the - // real method for the message. - var mt Method - - tmpH := mt.getHandler(m.Method) - if tmpH == nil { - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method) - } - - switch { - case m.ToNode == "": - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m) - case m.Method == "": - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m) - } - - sub := Subject{ - ToNode: string(m.ToNode), - Method: m.Method, - } - - sam := subjectAndMessage{ - Subject: sub, - Message: m, - } - - return sam, nil -} diff --git a/process.go b/process.go index 53ef14f..29fb346 100644 --- a/process.go +++ b/process.go @@ -13,25 +13,11 @@ import ( // "google.golang.org/protobuf/internal/errors" ) -// processKind are either kindSubscriber or kindPublisher, and are -// used to distinguish the kind of process to spawn and to know -// the process kind put in the process map. -type processKind string - -const ( - processKindSubscriber processKind = "subscriber" - processKindPublisher processKind = "publisher" -) - // process holds all the logic to handle a message type and it's // method, subscription/publishin messages for a subject, and more. type process struct { // isSubProcess is used to indentify subprocesses spawned by other processes. isSubProcess bool - // isLongRunningPublisher is set to true for a publisher service that should not - // be auto terminated like a normal autospawned publisher would be when the the - // inactivity timeout have expired - isLongRunningPublisher bool // server server *server // messageID @@ -43,8 +29,7 @@ type process struct { // Put a node here to be able know the node a process is at. node Node // The processID for the current process - processID int - processKind processKind + processID int // methodsAvailable methodsAvailable MethodsAvailable // procFunc is a function that will be started when a worker process @@ -117,7 +102,7 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { +func newProcess(ctx context.Context, server *server, subject Subject) process { // create the initial configuration for a sessions communicating with 1 host process. server.processes.mu.Lock() server.processes.lastProcessID++ @@ -134,7 +119,6 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin subject: subject, node: Node(server.configuration.NodeName), processID: pid, - processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), newMessagesCh: server.newMessagesCh, configuration: server.configuration, @@ -152,12 +136,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one request type. - if proc.processKind == processKindPublisher { - proc.processName = processNameGet(proc.subject.name(), processKindPublisher) - } - if proc.processKind == processKindSubscriber { - proc.processName = processNameGet(proc.subject.name(), processKindSubscriber) - } + proc.processName = processNameGet(proc.subject.name()) return proc } @@ -178,9 +157,7 @@ func (p process) start() { // Start a subscriber worker, which will start a go routine (process) // to handle executing the request method defined in the message. - if p.processKind == processKindSubscriber { - p.startSubscriber() - } + p.startSubscriber() // Add information about the new process to the started processes map. p.processes.active.mu.Lock() @@ -711,7 +688,7 @@ func (p process) publishAMessage(m Message, natsConn *nats.Conn) { // Get the process name so we can look up the process in the // processes map, and increment the message counter. - pn := processNameGet(p.subject.name(), processKindPublisher) + pn := processNameGet(p.subject.name()) // Increment the counter for the next message to be sent. p.messageID++ diff --git a/processes.go b/processes.go index 84f5b45..3cf8c96 100644 --- a/processes.go +++ b/processes.go @@ -200,7 +200,7 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, Hello, pf) + proc.startup.subscriber(proc, Hello, pf) } if proc.configuration.StartProcesses.EnableKeyUpdates { @@ -245,7 +245,7 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, KeysRequestUpdate, pf) + proc.startup.subscriber(proc, KeysRequestUpdate, pf) proc.startup.subscriber(proc, KeysDeliverUpdate, nil) } @@ -288,7 +288,7 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, AclRequestUpdate, pf) + proc.startup.subscriber(proc, AclRequestUpdate, pf) proc.startup.subscriber(proc, AclDeliverUpdate, nil) } @@ -368,25 +368,12 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p } fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) - proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) + proc := newProcess(p.ctx, p.processes.server, sub) proc.procFunc = pf go proc.start() } -// publisher will start a publisher process. It takes the initial process, request method, -// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. -func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { - er := fmt.Errorf("starting %v publisher: %#v", m, p.node) - p.errorKernel.logDebug(er) - sub := newSubject(m, string(p.node)) - proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) - proc.procFunc = pf - proc.isLongRunningPublisher = true - - go proc.start() -} - // --------------------------------------------------------------- // Print the content of the processes map. @@ -398,7 +385,7 @@ func (p *processes) printProcessesMap() { p.active.mu.Lock() for pName, proc := range p.active.procNames { - er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name()) + er := fmt.Errorf("info: proc - procName in map: %v , id: %v, subject: %v", pName, proc.processID, proc.subject.name()) proc.errorKernel.logDebug(er) } diff --git a/requests_copy.go b/requests_copy.go index a759fbd..06ce326 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -92,15 +92,6 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) } - // Check if the filepaths for the socket a realpaths. - file := filepath.Join(message.Directory, message.FileName) - if strings.HasPrefix(file, "./") || !strings.HasPrefix(file, "/") { - er := fmt.Errorf("error: copySrcSubHandler: path in message started with ./ or no directory at all, only full paths are allowed, path given was : %v", file) - proc.errorKernel.errSend(proc, message, er, logError) - newReplyMessage(proc, message, []byte(er.Error())) - return nil, er - } - var subProcessName string proc.processes.wg.Add(1) @@ -220,7 +211,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // Create a new sub process that will do the actual file copying. - copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copySrcSubProc := newSubProcess(ctx, proc.server, sub) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -268,8 +259,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { } // newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. -func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { - p := newProcess(ctx, server, subject, processKind) +func newSubProcess(ctx context.Context, server *server, subject Subject) process { + p := newProcess(ctx, server, subject) p.isSubProcess = true return p @@ -321,7 +312,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { // previous message is then fully up and running, so we just discard // that second message in those cases. - pn := processNameGet(sub.name(), processKindSubscriber) + pn := processNameGet(sub.name()) // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) proc.processes.active.mu.Lock() @@ -340,7 +331,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { } // Create a new sub process that will do the actual file copying. - copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copyDstSubProc := newSubProcess(ctx, proc.server, sub) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. diff --git a/requests_operator.go b/requests_operator.go index 6de3ba2..bd4889d 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -23,7 +23,7 @@ func methodOpProcessList(proc process, message Message, node string) ([]byte, er proc.processes.active.mu.Lock() for _, pTmp := range proc.processes.active.procNames { - s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name()) + s := fmt.Sprintf("%v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processID, pTmp.subject.name()) sb := []byte(s) out = append(out, sb...) @@ -68,7 +68,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) + procNew := newProcess(proc.ctx, proc.server, sub) go procNew.start() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) @@ -115,7 +115,6 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er methodString := message.MethodArgs[0] node := message.MethodArgs[1] - kind := message.MethodArgs[2] method := Method(methodString) tmpH := mt.getHandler(Method(method)) @@ -132,7 +131,7 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er // We can then use this processName to get the real values for the // actual process we want to stop. sub := newSubject(method, string(node)) - processName := processNameGet(sub.name(), processKind(kind)) + processName := processNameGet(sub.name()) // Remove the process from the processes active map if found. proc.processes.active.mu.Lock() diff --git a/server.go b/server.go index 4642882..aa4d81d 100644 --- a/server.go +++ b/server.go @@ -23,9 +23,8 @@ import ( type processName string // Will return a process name made up of subjectName+processKind -func processNameGet(sn subjectName, pk processKind) processName { - pn := fmt.Sprintf("%s_%s", sn, pk) - return processName(pn) +func processNameGet(sn subjectName) processName { + return processName(sn) } // server is the structure that will hold the state about spawned @@ -356,7 +355,7 @@ func (s *server) Start() { // // The context of the initial process are set in processes.Start. sub := newSubject(Initial, s.nodeName) - s.processInitial = newProcess(context.TODO(), s, sub, "") + s.processInitial = newProcess(context.TODO(), s, sub) // Start all wanted subscriber processes. s.processes.Start(s.processInitial) @@ -440,7 +439,7 @@ func (s *server) directSAMSChRead() { for i := range messages { // TODO: !!!!!! Shoud the node here be the fromNode ??????? subject := newSubject(messages[i].Method, string(messages[i].ToNode)) - processName := processNameGet(subject.name(), processKindSubscriber) + processName := processNameGet(subject.name()) s.processes.active.mu.Lock() p := s.processes.active.procNames[processName]