1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

named type procFunc

This commit is contained in:
postmannen 2021-03-10 14:14:09 +01:00
parent f6771f1975
commit 8fdf82f8a2
2 changed files with 31 additions and 18 deletions

View file

@ -40,13 +40,12 @@ type process struct {
allowedReceivers map[node]struct{}
// methodsAvailable
methodsAvailable MethodsAvailable
// TESTING:
// Helper or service function that can do some kind of work
// for the process.
// The idea is that this can hold for example the map of the
// the hello nodes to limit shared resources in the system as
// a whole for sharing a map from the *server level.
procFunc func() error
procFunc procFunc
// The channel to send a messages to the procFunc go routine.
// This is typically used within the methodHandler.
procFuncCh chan Message
@ -86,6 +85,16 @@ func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage,
return proc
}
// procFunc is a helper function that will do some extra work for
// a message received for a process. This allows us to ACK back
// to the publisher that the message was received, but we can let
// the processFunc keep on working.
// This can also be used to wrap in other types which we want to
// work with that come from the outside. An example can be handling
// of metrics which the message have no notion of, but a procFunc
// can have that wrapped in from when it was constructed.
type procFunc func() error
// The purpose of this function is to check if we should start a
// publisher or subscriber process, where a process is a go routine
// that will handle either sending or receiving messages on one

View file

@ -36,9 +36,10 @@ func (s *server) ProcessesStart() {
proc.procFunc = func() error {
sayHelloNodes := make(map[node]struct{})
for {
//fmt.Printf("-- DEBUG 4.1: procFunc %v, procFuncCh %v\n\n", proc.procFunc, proc.procFuncCh)
// Receive a copy of the message sent from the method handler.
m := <-proc.procFuncCh
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
sayHelloNodes[m.FromNode] = struct{}{}
// update the prometheus metrics
@ -70,28 +71,31 @@ func (s *server) ProcessesStart() {
// sending of say hello.
if s.configuration.PublisherServiceSayhello != 0 {
fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName)
// TODO: Replace "central" name with variable below.
sub := newSubject(SayHello, EventNACK, "central")
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
proc.procFunc = func() error {
for {
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
// Define the procFun to be used for the process.
proc.procFunc = procFunc(
func() error {
for {
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
d := fmt.Sprintf("Hello from %v\n", s.nodeName)
d := fmt.Sprintf("Hello from %v\n", s.nodeName)
m := Message{
ToNode: "central",
FromNode: node(s.nodeName),
Data: []string{d},
Method: SayHello,
m := Message{
ToNode: "central",
FromNode: node(s.nodeName),
Data: []string{d},
Method: SayHello,
}
sam := createSAMfromMessage(m)
proc.newMessagesCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello))
}
sam := createSAMfromMessage(m)
proc.newMessagesCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello))
}
}
})
go proc.spawnWorker(s)
}
}