From 8fdf82f8a2854fae6c609a4a7255de84aeb25f65 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 10 Mar 2021 14:14:09 +0100 Subject: [PATCH] named type procFunc --- process.go | 13 +++++++++++-- processesToStart.go | 36 ++++++++++++++++++++---------------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/process.go b/process.go index 3bff036..77bb2d9 100644 --- a/process.go +++ b/process.go @@ -40,13 +40,12 @@ type process struct { allowedReceivers map[node]struct{} // methodsAvailable methodsAvailable MethodsAvailable - // TESTING: // Helper or service function that can do some kind of work // for the process. // The idea is that this can hold for example the map of the // the hello nodes to limit shared resources in the system as // a whole for sharing a map from the *server level. - procFunc func() error + procFunc procFunc // The channel to send a messages to the procFunc go routine. // This is typically used within the methodHandler. procFuncCh chan Message @@ -86,6 +85,16 @@ func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage, return proc } +// procFunc is a helper function that will do some extra work for +// a message received for a process. This allows us to ACK back +// to the publisher that the message was received, but we can let +// the processFunc keep on working. +// This can also be used to wrap in other types which we want to +// work with that come from the outside. An example can be handling +// of metrics which the message have no notion of, but a procFunc +// can have that wrapped in from when it was constructed. +type procFunc func() error + // The purpose of this function is to check if we should start a // publisher or subscriber process, where a process is a go routine // that will handle either sending or receiving messages on one diff --git a/processesToStart.go b/processesToStart.go index b79e593..8d00654 100644 --- a/processesToStart.go +++ b/processesToStart.go @@ -36,9 +36,10 @@ func (s *server) ProcessesStart() { proc.procFunc = func() error { sayHelloNodes := make(map[node]struct{}) for { - //fmt.Printf("-- DEBUG 4.1: procFunc %v, procFuncCh %v\n\n", proc.procFunc, proc.procFuncCh) + // Receive a copy of the message sent from the method handler. m := <-proc.procFuncCh fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) + sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics @@ -70,28 +71,31 @@ func (s *server) ProcessesStart() { // sending of say hello. if s.configuration.PublisherServiceSayhello != 0 { fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName) + // TODO: Replace "central" name with variable below. sub := newSubject(SayHello, EventNACK, "central") proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) - proc.procFunc = func() error { - for { - fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) + // Define the procFun to be used for the process. + proc.procFunc = procFunc( + func() error { + for { + fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) - d := fmt.Sprintf("Hello from %v\n", s.nodeName) + d := fmt.Sprintf("Hello from %v\n", s.nodeName) - m := Message{ - ToNode: "central", - FromNode: node(s.nodeName), - Data: []string{d}, - Method: SayHello, + m := Message{ + ToNode: "central", + FromNode: node(s.nodeName), + Data: []string{d}, + Method: SayHello, + } + + sam := createSAMfromMessage(m) + proc.newMessagesCh <- []subjectAndMessage{sam} + time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello)) } - - sam := createSAMfromMessage(m) - proc.newMessagesCh <- []subjectAndMessage{sam} - time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello)) - } - } + }) go proc.spawnWorker(s) } }