From 077bbf37dab6964171a03e9f9350d6253b2498dc Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 9 Mar 2021 11:58:50 +0100 Subject: [PATCH] Publishers as pure processes seems to initially work --- publisher-services.go | 109 ++++++++++++++++++++---------------------- server.go | 32 ++++++++----- subscribers.go | 47 +++++++++++++++++- 3 files changed, 118 insertions(+), 70 deletions(-) diff --git a/publisher-services.go b/publisher-services.go index e1388f0..4ea16e2 100644 --- a/publisher-services.go +++ b/publisher-services.go @@ -1,58 +1,55 @@ package steward -import ( - "fmt" - "time" -) - -type publisherServices struct { - sayHelloPublisher sayHelloPublisher -} - -func newPublisherServices(sayHelloInterval int) *publisherServices { - ps := publisherServices{ - sayHelloPublisher: sayHelloPublisher{ - interval: sayHelloInterval, - }, - } - return &ps -} - -// --- - -type sayHelloPublisher struct { - interval int -} - -func (s *sayHelloPublisher) start(newMessagesCh chan<- []subjectAndMessage, fromNode node) { - go func() { - for { - sam := s.createMsg(fromNode) - newMessagesCh <- []subjectAndMessage{sam} - time.Sleep(time.Second * time.Duration(s.interval)) - } - }() -} - -// Will prepare a subject and message with the content -// of the error -func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage { - // TESTING: Creating an error message to send to errorCentral - m := fmt.Sprintf("Hello from %v\n", FromNode) - - sam := subjectAndMessage{ - Subject: Subject{ - ToNode: "central", - CommandOrEvent: EventNACK, - Method: SayHello, - }, - Message: Message{ - ToNode: "central", - FromNode: FromNode, - Data: []string{m}, - Method: SayHello, - }, - } - - return sam -} +// // REMOVED: +// type publisherServices struct { +// sayHelloPublisher sayHelloPublisher +// } +// +// func newPublisherServices(sayHelloInterval int) *publisherServices { +// ps := publisherServices{ +// sayHelloPublisher: sayHelloPublisher{ +// interval: sayHelloInterval, +// }, +// } +// return &ps +// } +// +// // --- +// +// type sayHelloPublisher struct { +// interval int +// } +// +// func (s *sayHelloPublisher) start(newMessagesCh chan<- []subjectAndMessage, fromNode node) { +// go func() { +// for { +// sam := s.createMsg(fromNode) +// newMessagesCh <- []subjectAndMessage{sam} +// time.Sleep(time.Second * time.Duration(s.interval)) +// } +// }() +// } +// +// // Will prepare a subject and message with the content +// // of the error +// func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage { +// // TESTING: Creating an error message to send to errorCentral +// m := fmt.Sprintf("Hello from %v\n", FromNode) +// +// sam := subjectAndMessage{ +// Subject: Subject{ +// ToNode: "central", +// CommandOrEvent: EventNACK, +// Method: SayHello, +// }, +// Message: Message{ +// ToNode: "central", +// FromNode: FromNode, +// Data: []string{m}, +// Method: SayHello, +// }, +// } +// +// return sam +// } +// diff --git a/server.go b/server.go index f5310e2..0f6389d 100644 --- a/server.go +++ b/server.go @@ -60,7 +60,8 @@ type server struct { metrics *metrics // Is this the central error logger ? // collection of the publisher services and the types to control them - publisherServices *publisherServices + // REMOVED: + // publisherServices *publisherServices centralErrorLogger bool } @@ -72,13 +73,14 @@ func NewServer(c *Configuration) (*server, error) { } s := &server{ - configuration: c, - nodeName: c.NodeName, - natsConn: conn, - processes: newProcesses(), - newMessagesCh: make(chan []subjectAndMessage), - metrics: newMetrics(c.PromHostAndPort), - publisherServices: newPublisherServices(c.PublisherServiceSayhello), + configuration: c, + nodeName: c.NodeName, + natsConn: conn, + processes: newProcesses(), + newMessagesCh: make(chan []subjectAndMessage), + metrics: newMetrics(c.PromHostAndPort), + // REMOVED: + //publisherServices: newPublisherServices(c.PublisherServiceSayhello), centralErrorLogger: c.CentralErrorLogger, } @@ -118,10 +120,11 @@ func (s *server) Start() { // Start the checking the input file for new messages from operator. go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) - // if enabled, start the sayHello I'm here service at the given interval - if s.publisherServices.sayHelloPublisher.interval != 0 { - go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName)) - } + // // if enabled, start the sayHello I'm here service at the given interval + // // REMOVED: + // if s.publisherServices.sayHelloPublisher.interval != 0 { + // go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName)) + // } // Start up the predefined subscribers. // TODO: What to subscribe on should be handled via flags, or config @@ -143,7 +146,7 @@ func (s *server) printProcessesMap() { fmt.Printf("*** Output of processes map :\n") s.processes.mu.Lock() for _, v := range s.processes.active { - fmt.Printf("* proc - : id: %v, name: %v, allowed from: %v\n", v.processID, v.subject.name(), v.allowedReceivers) + fmt.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers) } s.processes.mu.Unlock() @@ -254,7 +257,10 @@ func (s *server) routeMessagesToPublish(dbFileName string, newSAM chan []subject subjName := sam.Subject.name() // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) pn := processNameGet(subjName, processKindPublisher) + + s.processes.mu.Lock() _, ok := s.processes.active[pn] + s.processes.mu.Unlock() // Are there already a process for that subject, put the // message on that processes incomming message channel. diff --git a/subscribers.go b/subscribers.go index d9fdd83..9d60f33 100644 --- a/subscribers.go +++ b/subscribers.go @@ -2,6 +2,7 @@ package steward import ( "fmt" + "time" "github.com/prometheus/client_golang/prometheus" ) @@ -36,7 +37,7 @@ func (s *server) subscribersStart() { for { //fmt.Printf("-- DEBUG 4.1: procFunc %v, procFuncCh %v\n\n", proc.procFunc, proc.procFuncCh) m := <-proc.procFuncCh - fmt.Printf("-----------DEBUG : THIS IS THE procFunc BEING CALLED !!!!! ---------\n") + fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics @@ -61,4 +62,48 @@ func (s *server) subscribersStart() { go proc.spawnWorker(s) } } + + // --------- Testing with publisher ------------ + // Define a process of kind publisher with subject for SayHello to central, + // and register a procFunc with the process that will handle the actual + // sending of say hello. + if s.configuration.PublisherServiceSayhello != 0 { + fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName) + // TODO: Replace "central" name with variable below. + sub := newSubject(SayHello, EventNACK, "central") + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) + + proc.procFunc = func() error { + for { + fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) + + m := fmt.Sprintf("Hello from %v\n", s.nodeName) + + sam := subjectAndMessage{ + Subject: Subject{ + ToNode: "central", + CommandOrEvent: EventNACK, + Method: SayHello, + }, + Message: Message{ + ToNode: "central", + FromNode: node(s.nodeName), + Data: []string{m}, + Method: SayHello, + }, + } + proc.newMessagesCh <- []subjectAndMessage{sam} + time.Sleep(time.Second * time.Duration(10)) + } + } + go proc.spawnWorker(s) + } + + //func() { + // for { + // sam := s.createMsg(fromNode) + // newMessagesCh <- []subjectAndMessage{sam} + // time.Sleep(time.Second * time.Duration(s.interval)) + // } + //}() }