mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
removed process dependency to server
This commit is contained in:
parent
2a19ae451b
commit
f2a07010d9
4 changed files with 142 additions and 122 deletions
|
@ -52,7 +52,10 @@ type Subject struct {
|
||||||
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
|
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
|
||||||
// method, what is this message doing, etc. CLICommand, Syslog, etc.
|
// method, what is this message doing, etc. CLICommand, Syslog, etc.
|
||||||
Method Method `json:"method" yaml:"method"`
|
Method Method `json:"method" yaml:"method"`
|
||||||
// messageCh is the channel for receiving new content to be sent
|
// messageCh is used by publisher kind processes to read new messages
|
||||||
|
// to be published. The content on this channel have been routed here
|
||||||
|
// from routeMessagesToPublish in *server.
|
||||||
|
// This channel is only used for publishing processes.
|
||||||
messageCh chan Message
|
messageCh chan Message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
54
process.go
54
process.go
|
@ -113,7 +113,20 @@ func (p process) spawnWorker(s *server) {
|
||||||
// Start a publisher worker, which will start a go routine (process)
|
// Start a publisher worker, which will start a go routine (process)
|
||||||
// That will take care of all the messages for the subject it owns.
|
// That will take care of all the messages for the subject it owns.
|
||||||
if p.processKind == processKindPublisher {
|
if p.processKind == processKindPublisher {
|
||||||
go p.publishMessages(s)
|
// If there is a procFunc for the process, start it.
|
||||||
|
if p.procFunc != nil {
|
||||||
|
// REMOVED: p.procFuncCh = make(chan Message)
|
||||||
|
// Start the procFunc in it's own anonymous func so we are able
|
||||||
|
// to get the return error.
|
||||||
|
go func() {
|
||||||
|
err := p.procFunc()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: spawnWorker: procFunc failed: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
go p.publishMessages(s.natsConn, s.processes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber worker, which will start a go routine (process)
|
// Start a subscriber worker, which will start a go routine (process)
|
||||||
|
@ -140,7 +153,7 @@ func (p process) spawnWorker(s *server) {
|
||||||
// messageDeliverNats will take care of the delivering the message
|
// messageDeliverNats will take care of the delivering the message
|
||||||
// as converted to gob format as a nats.Message. It will also take
|
// as converted to gob format as a nats.Message. It will also take
|
||||||
// care of checking timeouts and retries specified for the message.
|
// care of checking timeouts and retries specified for the message.
|
||||||
func (s *server) messageDeliverNats(proc process, message Message) {
|
func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
retryAttempts := 0
|
retryAttempts := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -150,11 +163,11 @@ func (s *server) messageDeliverNats(proc process, message Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &nats.Msg{
|
msg := &nats.Msg{
|
||||||
Subject: string(proc.subject.name()),
|
Subject: string(p.subject.name()),
|
||||||
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"),
|
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"),
|
||||||
// Structure of the reply message are:
|
// Structure of the reply message are:
|
||||||
// reply.<nodename>.<message type>.<method>
|
// reply.<nodename>.<message type>.<method>
|
||||||
Reply: fmt.Sprintf("reply.%s", proc.subject.name()),
|
Reply: fmt.Sprintf("reply.%s", p.subject.name()),
|
||||||
Data: dataPayload,
|
Data: dataPayload,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,14 +176,14 @@ func (s *server) messageDeliverNats(proc process, message Message) {
|
||||||
// that sends out a message every second.
|
// that sends out a message every second.
|
||||||
//
|
//
|
||||||
// Create a subscriber for the reply message.
|
// Create a subscriber for the reply message.
|
||||||
subReply, err := s.natsConn.SubscribeSync(msg.Reply)
|
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err)
|
log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish message
|
// Publish message
|
||||||
err = s.natsConn.PublishMsg(msg)
|
err = natsConn.PublishMsg(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: publish failed: %v\n", err)
|
log.Printf("error: publish failed: %v\n", err)
|
||||||
continue
|
continue
|
||||||
|
@ -179,13 +192,13 @@ func (s *server) messageDeliverNats(proc process, message Message) {
|
||||||
// If the message is an ACK type of message we must check that a
|
// If the message is an ACK type of message we must check that a
|
||||||
// reply, and if it is not we don't wait here at all.
|
// reply, and if it is not we don't wait here at all.
|
||||||
fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message)
|
fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message)
|
||||||
if proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK {
|
if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK {
|
||||||
// Wait up until timeout specified for a reply,
|
// Wait up until timeout specified for a reply,
|
||||||
// continue and resend if noo reply received,
|
// continue and resend if noo reply received,
|
||||||
// or exit if max retries for the message reached.
|
// or exit if max retries for the message reached.
|
||||||
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout))
|
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err)
|
log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", p.node, p.subject.name(), err)
|
||||||
|
|
||||||
// did not receive a reply, decide what to do..
|
// did not receive a reply, decide what to do..
|
||||||
retryAttempts++
|
retryAttempts++
|
||||||
|
@ -239,7 +252,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
switch {
|
switch {
|
||||||
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
|
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
|
||||||
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
||||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
if !ok {
|
if !ok {
|
||||||
// TODO: Check how errors should be handled here!!!
|
// TODO: Check how errors should be handled here!!!
|
||||||
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
|
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
|
||||||
|
@ -257,7 +270,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
// The handler started here is what actually doing the action
|
// The handler started here is what actually doing the action
|
||||||
// that executed a CLI command, or writes to a log file on
|
// that executed a CLI command, or writes to a log file on
|
||||||
// the node who received the message.
|
// the node who received the message.
|
||||||
out, err = mf.handler(p, message, thisNode)
|
out, err = mh.handler(p, message, thisNode)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: Send to error kernel ?
|
// TODO: Send to error kernel ?
|
||||||
|
@ -320,20 +333,29 @@ func (p process) subscribeMessages(s *server) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p process) publishMessages(s *server) {
|
// publishMessages will do the publishing of messages for one single
|
||||||
|
// process.
|
||||||
|
func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
|
||||||
for {
|
for {
|
||||||
// Wait and read the next message on the message channel
|
// Wait and read the next message on the message channel
|
||||||
m := <-p.subject.messageCh
|
m := <-p.subject.messageCh
|
||||||
|
|
||||||
|
// Get the process name so we can look up the process in the
|
||||||
|
// processes map, and increment the message counter.
|
||||||
pn := processNameGet(p.subject.name(), processKindPublisher)
|
pn := processNameGet(p.subject.name(), processKindPublisher)
|
||||||
m.ID = s.processes.active[pn].messageID
|
m.ID = processes.active[pn].messageID
|
||||||
s.messageDeliverNats(p, m)
|
|
||||||
|
p.messageDeliverNats(natsConn, m)
|
||||||
|
|
||||||
|
// Signaling back to the ringbuffer that we are done with the
|
||||||
|
// current message, and it can remove it from the ringbuffer.
|
||||||
m.done <- struct{}{}
|
m.done <- struct{}{}
|
||||||
|
|
||||||
// Increment the counter for the next message to be sent.
|
// Increment the counter for the next message to be sent.
|
||||||
p.messageID++
|
p.messageID++
|
||||||
s.processes.mu.Lock()
|
processes.mu.Lock()
|
||||||
s.processes.active[pn] = p
|
processes.active[pn] = p
|
||||||
s.processes.mu.Unlock()
|
processes.mu.Unlock()
|
||||||
// REMOVED: sleep
|
// REMOVED: sleep
|
||||||
//time.Sleep(time.Second * 1)
|
//time.Sleep(time.Second * 1)
|
||||||
|
|
||||||
|
|
102
publisher.go
102
publisher.go
|
@ -1,102 +0,0 @@
|
||||||
package steward
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// processNewMessages takes a database name and an input channel as
|
|
||||||
// it's input arguments.
|
|
||||||
// The database will be used as the persistent store for the work queue
|
|
||||||
// which is implemented as a ring buffer.
|
|
||||||
// The input channel are where we read new messages to publish.
|
|
||||||
// Incomming messages will be routed to the correct subject process, where
|
|
||||||
// the handling of each nats subject is handled within it's own separate
|
|
||||||
// worker process.
|
|
||||||
// It will also handle the process of spawning more worker processes
|
|
||||||
// for publisher subjects if it does not exist.
|
|
||||||
func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndMessage) {
|
|
||||||
// Prepare and start a new ring buffer
|
|
||||||
const bufferSize int = 1000
|
|
||||||
rb := newringBuffer(bufferSize, dbFileName)
|
|
||||||
inCh := make(chan subjectAndMessage)
|
|
||||||
ringBufferOutCh := make(chan samDBValue)
|
|
||||||
// start the ringbuffer.
|
|
||||||
rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries)
|
|
||||||
|
|
||||||
// Start reading new fresh messages received on the incomming message
|
|
||||||
// pipe/file requested, and fill them into the buffer.
|
|
||||||
go func() {
|
|
||||||
for samSlice := range newSAM {
|
|
||||||
for _, sam := range samSlice {
|
|
||||||
inCh <- sam
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(inCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Process the messages that are in the ring buffer. Check and
|
|
||||||
// send if there are a specific subject for it, and if no subject
|
|
||||||
// exist throw an error.
|
|
||||||
|
|
||||||
var coe CommandOrEvent
|
|
||||||
coeAvailable := coe.GetCommandOrEventAvailable()
|
|
||||||
|
|
||||||
var method Method
|
|
||||||
methodsAvailable := method.GetMethodsAvailable()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for samTmp := range ringBufferOutCh {
|
|
||||||
sam := samTmp.Data
|
|
||||||
// Check if the format of the message is correct.
|
|
||||||
// TODO: Send a message to the error kernel here that
|
|
||||||
// it was unable to process the message with the reason
|
|
||||||
// why ?
|
|
||||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
|
||||||
log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
|
||||||
log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
redo:
|
|
||||||
// Adding a label here so we are able to redo the sending
|
|
||||||
// of the last message if a process with specified subject
|
|
||||||
// is not present. The process will then be created, and
|
|
||||||
// the code will loop back to the redo: label.
|
|
||||||
|
|
||||||
m := sam.Message
|
|
||||||
subjName := sam.Subject.name()
|
|
||||||
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
|
|
||||||
pn := processNameGet(subjName, processKindPublisher)
|
|
||||||
_, ok := s.processes.active[pn]
|
|
||||||
|
|
||||||
// Are there already a process for that subject, put the
|
|
||||||
// message on that processes incomming message channel.
|
|
||||||
if ok {
|
|
||||||
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
|
|
||||||
s.processes.active[pn].subject.messageCh <- m
|
|
||||||
|
|
||||||
// If no process to handle the specific subject exist,
|
|
||||||
// the we create and spawn one.
|
|
||||||
} else {
|
|
||||||
// If a publisher process do not exist for the given subject, create it, and
|
|
||||||
// by using the goto at the end redo the process for this specific message.
|
|
||||||
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
|
||||||
|
|
||||||
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
|
|
||||||
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
|
|
||||||
// fmt.Printf("*** %#v\n", proc)
|
|
||||||
proc.spawnWorker(s)
|
|
||||||
|
|
||||||
// REMOVED:
|
|
||||||
//time.Sleep(time.Millisecond * 500)
|
|
||||||
s.printProcessesMap()
|
|
||||||
// Now when the process is spawned we jump back to the redo: label,
|
|
||||||
// and send the message to that new process.
|
|
||||||
goto redo
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
103
server.go
103
server.go
|
@ -25,7 +25,7 @@ type processes struct {
|
||||||
// The active spawned processes
|
// The active spawned processes
|
||||||
active map[processName]process
|
active map[processName]process
|
||||||
// mutex to lock the map
|
// mutex to lock the map
|
||||||
mu sync.Mutex
|
mu sync.RWMutex
|
||||||
// The last processID created
|
// The last processID created
|
||||||
lastProcessID int
|
lastProcessID int
|
||||||
}
|
}
|
||||||
|
@ -131,8 +131,8 @@ func (s *server) Start() {
|
||||||
time.Sleep(time.Second * 1)
|
time.Sleep(time.Second * 1)
|
||||||
s.printProcessesMap()
|
s.printProcessesMap()
|
||||||
|
|
||||||
// Start the processing of new messaging from an input channel.
|
// Start the processing of new messages from an input channel.
|
||||||
s.processNewMessages("./incommmingBuffer.db", s.newMessagesCh)
|
s.routeMessagesToPublish("./incommmingBuffer.db", s.newMessagesCh)
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
||||||
|
@ -187,3 +187,100 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
|
||||||
|
|
||||||
return sam
|
return sam
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// routeMessagesToPublish takes a database name and an input channel as
|
||||||
|
// it's input arguments.
|
||||||
|
// The database will be used as the persistent store for the work queue
|
||||||
|
// which is implemented as a ring buffer.
|
||||||
|
// The input channel are where we read new messages to publish.
|
||||||
|
// Incomming messages will be routed to the correct subject process, where
|
||||||
|
// the handling of each nats subject is handled within it's own separate
|
||||||
|
// worker process.
|
||||||
|
// It will also handle the process of spawning more worker processes
|
||||||
|
// for publisher subjects if it does not exist.
|
||||||
|
func (s *server) routeMessagesToPublish(dbFileName string, newSAM chan []subjectAndMessage) {
|
||||||
|
// Prepare and start a new ring buffer
|
||||||
|
const bufferSize int = 1000
|
||||||
|
rb := newringBuffer(bufferSize, dbFileName)
|
||||||
|
inCh := make(chan subjectAndMessage)
|
||||||
|
ringBufferOutCh := make(chan samDBValue)
|
||||||
|
// start the ringbuffer.
|
||||||
|
rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries)
|
||||||
|
|
||||||
|
// Start reading new fresh messages received on the incomming message
|
||||||
|
// pipe/file requested, and fill them into the buffer.
|
||||||
|
go func() {
|
||||||
|
for samSlice := range newSAM {
|
||||||
|
for _, sam := range samSlice {
|
||||||
|
inCh <- sam
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(inCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Process the messages that are in the ring buffer. Check and
|
||||||
|
// send if there are a specific subject for it, and if no subject
|
||||||
|
// exist throw an error.
|
||||||
|
|
||||||
|
var coe CommandOrEvent
|
||||||
|
coeAvailable := coe.GetCommandOrEventAvailable()
|
||||||
|
|
||||||
|
var method Method
|
||||||
|
methodsAvailable := method.GetMethodsAvailable()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for samTmp := range ringBufferOutCh {
|
||||||
|
sam := samTmp.Data
|
||||||
|
// Check if the format of the message is correct.
|
||||||
|
// TODO: Send a message to the error kernel here that
|
||||||
|
// it was unable to process the message with the reason
|
||||||
|
// why ?
|
||||||
|
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||||
|
log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
||||||
|
log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
redo:
|
||||||
|
// Adding a label here so we are able to redo the sending
|
||||||
|
// of the last message if a process with specified subject
|
||||||
|
// is not present. The process will then be created, and
|
||||||
|
// the code will loop back to the redo: label.
|
||||||
|
|
||||||
|
m := sam.Message
|
||||||
|
subjName := sam.Subject.name()
|
||||||
|
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
|
||||||
|
pn := processNameGet(subjName, processKindPublisher)
|
||||||
|
_, ok := s.processes.active[pn]
|
||||||
|
|
||||||
|
// Are there already a process for that subject, put the
|
||||||
|
// message on that processes incomming message channel.
|
||||||
|
if ok {
|
||||||
|
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
|
||||||
|
s.processes.active[pn].subject.messageCh <- m
|
||||||
|
|
||||||
|
// If no process to handle the specific subject exist,
|
||||||
|
// the we create and spawn one.
|
||||||
|
} else {
|
||||||
|
// If a publisher process do not exist for the given subject, create it, and
|
||||||
|
// by using the goto at the end redo the process for this specific message.
|
||||||
|
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||||
|
|
||||||
|
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
|
||||||
|
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
|
||||||
|
// fmt.Printf("*** %#v\n", proc)
|
||||||
|
proc.spawnWorker(s)
|
||||||
|
|
||||||
|
// REMOVED:
|
||||||
|
//time.Sleep(time.Millisecond * 500)
|
||||||
|
s.printProcessesMap()
|
||||||
|
// Now when the process is spawned we jump back to the redo: label,
|
||||||
|
// and send the message to that new process.
|
||||||
|
goto redo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue