1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

processes type, and moved check out of server closer to where they are needed

This commit is contained in:
postmannen 2021-03-03 15:44:32 +01:00
parent d1b11c6bad
commit d38b9b03dc
3 changed files with 60 additions and 52 deletions

View file

@ -38,13 +38,15 @@ type process struct {
processKind processKind processKind processKind
// Who are we allowed to receive from ? // Who are we allowed to receive from ?
allowedReceivers map[node]struct{} allowedReceivers map[node]struct{}
// methodsAvailable
methodsAvailable MethodsAvailable
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func newProcess(s *server, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process { func newProcess(s *server, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++ s.processes.lastProcessID++
// make the slice of allowedReceivers into a map value for easy lookup. // make the slice of allowedReceivers into a map value for easy lookup.
m := make(map[node]struct{}) m := make(map[node]struct{})
@ -52,14 +54,17 @@ func newProcess(s *server, subject Subject, errCh chan errProcess, processKind p
m[a] = struct{}{} m[a] = struct{}{}
} }
var method Method
proc := process{ proc := process{
messageID: 0, messageID: 0,
subject: subject, subject: subject,
node: node(subject.ToNode), node: node(subject.ToNode),
processID: s.lastProcessID, processID: s.processes.lastProcessID,
errorCh: errCh, errorCh: errCh,
processKind: processKind, processKind: processKind,
allowedReceivers: m, allowedReceivers: m,
methodsAvailable: method.GetMethodsAvailable(),
} }
return proc return proc
@ -85,9 +90,9 @@ func (p process) spawnWorker(s *server) {
} }
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
s.mu.Lock() s.processes.mu.Lock()
s.processes[pn] = p s.processes.active[pn] = p
s.mu.Unlock() s.processes.mu.Unlock()
// Start a publisher worker, which will start a go routine (process) // Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
@ -204,7 +209,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
switch { switch {
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
mf, ok := s.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
// TODO: Check how errors should be handled here!!! // TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
@ -243,7 +248,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
} }
case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK:
log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
mf, ok := s.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
// TODO: Check how errors should be handled here!!! // TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
@ -287,13 +292,13 @@ func (p process) publishMessages(s *server) {
// Wait and read the next message on the message channel // Wait and read the next message on the message channel
m := <-p.subject.messageCh m := <-p.subject.messageCh
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = s.processes[pn].messageID m.ID = s.processes.active[pn].messageID
s.messageDeliverNats(p, m) s.messageDeliverNats(p, m)
m.done <- struct{}{} m.done <- struct{}{}
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
p.messageID++ p.messageID++
s.processes[pn] = p s.processes.active[pn] = p
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// NB: simulate that we get an error, and that we can send that // NB: simulate that we get an error, and that we can send that

View file

@ -22,7 +22,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
inCh := make(chan subjectAndMessage) inCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValue) ringBufferOutCh := make(chan samDBValue)
// start the ringbuffer. // start the ringbuffer.
rb.start(inCh, ringBufferOutCh, s.defaultMessageTimeout, s.defaultMessageRetries) rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries)
// Start reading new fresh messages received on the incomming message // Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer. // pipe/file requested, and fill them into the buffer.
@ -38,6 +38,13 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
// Process the messages that are in the ring buffer. Check and // Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and if no subject // send if there are a specific subject for it, and if no subject
// exist throw an error. // exist throw an error.
var coe CommandOrEvent
coeAvailable := coe.GetCommandOrEventAvailable()
var method Method
methodsAvailable := method.GetMethodsAvailable()
go func() { go func() {
for samTmp := range ringBufferOutCh { for samTmp := range ringBufferOutCh {
sam := samTmp.Data sam := samTmp.Data
@ -45,11 +52,11 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
// TODO: Send a message to the error kernel here that // TODO: Send a message to the error kernel here that
// it was unable to process the message with the reason // it was unable to process the message with the reason
// why ? // why ?
if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok { if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method) log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method)
continue continue
} }
if !s.commandOrEventAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent) log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent)
continue continue
} }
@ -64,13 +71,13 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
subjName := sam.Subject.name() subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet(subjName, processKindPublisher) pn := processNameGet(subjName, processKindPublisher)
_, ok := s.processes[pn] _, ok := s.processes.active[pn]
// Are there already a process for that subject, put the // Are there already a process for that subject, put the
// message on that processes incomming message channel. // message on that processes incomming message channel.
if ok { if ok {
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
s.processes[pn].subject.messageCh <- m s.processes.active[pn].subject.messageCh <- m
// If no process to handle the specific subject exist, // If no process to handle the specific subject exist,
// the we create and spawn one. // the we create and spawn one.

View file

@ -19,6 +19,25 @@ func processNameGet(sn subjectName, pk processKind) processName {
return processName(pn) return processName(pn)
} }
// processes holds all the information about running processes
type processes struct {
// The active spawned processes
active map[processName]process
// mutex to lock the map
mu sync.Mutex
// The last processID created
lastProcessID int
}
// newProcesses will prepare and return a *processes
func newProcesses() *processes {
p := processes{
active: make(map[processName]process),
}
return &p
}
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
// processes on a local instance. // processes on a local instance.
type server struct { type server struct {
@ -26,28 +45,16 @@ type server struct {
configuration *Configuration configuration *Configuration
// The nats connection to the broker // The nats connection to the broker
natsConn *nats.Conn natsConn *nats.Conn
// TODO: sessions should probably hold a slice/map of processes ? // processes holds all the information about running processes
processes map[processName]process processes *processes
// The last processID created
lastProcessID int
// The name of the node // The name of the node
nodeName string nodeName string
// Mutex for locking when writing to the process map // Mutex for locking when writing to the process map
mu sync.Mutex
// The channel where we put new messages read from file,
// or some other process who wants to send something via the
// system
// We can than range this channel for new messages to process.
newMessagesCh chan []subjectAndMessage newMessagesCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if // errorKernel is doing all the error handling like what to do if
// an error occurs. // an error occurs.
// TODO: Will also send error messages to cental error subscriber. // TODO: Will also send error messages to cental error subscriber.
errorKernel *errorKernel errorKernel *errorKernel
// used to check if the methods specified in message is valid
methodsAvailable MethodsAvailable
// Map who holds the command and event types available.
// Used to check if the commandOrEvent specified in message is valid
commandOrEventAvailable CommandOrEventAvailable
// metric exporter // metric exporter
metrics *metrics metrics *metrics
// subscriberServices are where we find the services and the API to // subscriberServices are where we find the services and the API to
@ -60,10 +67,6 @@ type server struct {
// collection of the publisher services and the types to control them // collection of the publisher services and the types to control them
publisherServices *publisherServices publisherServices *publisherServices
centralErrorLogger bool centralErrorLogger bool
// default message timeout in seconds. This can be overridden on the message level
defaultMessageTimeout int
// default amount of retries that will be done before a message is thrown away, and out of the system
defaultMessageRetries int
} }
// newServer will prepare and return a server type // newServer will prepare and return a server type
@ -73,23 +76,16 @@ func NewServer(c *Configuration) (*server, error) {
log.Printf("error: nats.Connect failed: %v\n", err) log.Printf("error: nats.Connect failed: %v\n", err)
} }
var m Method
var coe CommandOrEvent
s := &server{ s := &server{
configuration: c, configuration: c,
nodeName: c.NodeName, nodeName: c.NodeName,
natsConn: conn, natsConn: conn,
processes: make(map[processName]process), processes: newProcesses(),
newMessagesCh: make(chan []subjectAndMessage), newMessagesCh: make(chan []subjectAndMessage),
methodsAvailable: m.GetMethodsAvailable(), metrics: newMetrics(c.PromHostAndPort),
commandOrEventAvailable: coe.GetCommandOrEventAvailable(), subscriberServices: newSubscriberServices(),
metrics: newMetrics(c.PromHostAndPort), publisherServices: newPublisherServices(c.PublisherServiceSayhello),
subscriberServices: newSubscriberServices(), centralErrorLogger: c.CentralErrorLogger,
publisherServices: newPublisherServices(c.PublisherServiceSayhello),
centralErrorLogger: c.CentralErrorLogger,
defaultMessageTimeout: c.DefaultMessageTimeout,
defaultMessageRetries: c.DefaultMessageRetries,
} }
// Create the default data folder for where subscribers should // Create the default data folder for where subscribers should
@ -151,18 +147,18 @@ func (s *server) Start() {
func (s *server) printProcessesMap() { func (s *server) printProcessesMap() {
fmt.Println("--------------------------------------------------------------------------------------------") fmt.Println("--------------------------------------------------------------------------------------------")
fmt.Printf("*** Output of processes map :\n") fmt.Printf("*** Output of processes map :\n")
s.mu.Lock() s.processes.mu.Lock()
for _, v := range s.processes { for _, v := range s.processes.active {
fmt.Printf("*** - : %v\n", v) fmt.Printf("*** - : %v\n", v)
} }
s.mu.Unlock() s.processes.mu.Unlock()
s.metrics.metricsCh <- metricType{ s.metrics.metricsCh <- metricType{
metric: prometheus.NewGauge(prometheus.GaugeOpts{ metric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "total_running_processes", Name: "total_running_processes",
Help: "The current number of total running processes", Help: "The current number of total running processes",
}), }),
value: float64(len(s.processes)), value: float64(len(s.processes.active)),
} }
fmt.Println("--------------------------------------------------------------------------------------------") fmt.Println("--------------------------------------------------------------------------------------------")