1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

put in a ring buffer for new messages

This commit is contained in:
postmannen 2021-02-12 11:21:51 +01:00
parent 63c64aa669
commit fb02962231
5 changed files with 170 additions and 66 deletions

View file

@ -1,28 +0,0 @@
package steward
// // ringBuffer holds the data of the buffer,
// type ringBuffer struct {
// data []string
// }
//
// // newringBuffer is a push/pop storage for values.
// func newringBuffer() *ringBuffer {
// return &ringBuffer{}
// }
//
// // push will add another item to the end of the buffer with a normal append
// func (s *ringBuffer) push(d string) {
// s.data = append(s.data, d)
// }
//
// // pop will remove and return the first element of the buffer
// func (s *ringBuffer) pop() string {
// if len(s.data) == 0 {
// return ""
// }
//
// v := s.data[0]
// s.data = append(s.data[0:0], s.data[1:]...)
//
// return v
// }

View file

@ -6,5 +6,13 @@
"commandOrEvent":"command",
"method":"shellCommand"
},
{
"toNode": "ship1",
"data": ["bash","-c","ls -l ../"],
"commandOrEvent":"command",
"method":"shellCommand"
}
]

View file

@ -136,50 +136,71 @@ func (s *server) printProcessesMap() {
// handleNewOperatorMessages will handle all the new operator messages
// given to the system, and route them to the correct subject queue.
func (s *server) handleMessagesToPublish() {
// Process the messages that have been received on the incomming
// message pipe. Check and send if there are a specific subject
// for it, and no subject exist throw an error.
// Prepare and start a new ring buffer
const bufferSize int = 100
rb := newringBuffer(bufferSize)
inCh := make(chan subjectAndMessage)
outCh := make(chan subjectAndMessage)
rb.start(inCh, outCh)
// Start reading new messages received on the incomming message
// pipe requested by operator, and fill them into the buffer.
go func() {
for samSlice := range s.newMessagesCh {
for i, sam := range samSlice {
// Check if the format of the message is correct.
// TODO: Send a message to the error kernel here that
// it was unable to process the message with the reason
// why ?
if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
continue
}
if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) {
continue
}
fmt.Println("***** DEBUG ranging samSlice")
for _, sam := range samSlice {
fmt.Println("***** DEBUG putting on channel")
inCh <- sam
fmt.Println("***** DEBUG done putting on channel")
}
}
close(inCh)
}()
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present.
// Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and no subject
// exist throw an error.
go func() {
for sam := range outCh {
// Check if the format of the message is correct.
// TODO: Send a message to the error kernel here that
// it was unable to process the message with the reason
// why ?
if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
continue
}
if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) {
continue
}
redo:
m := sam.Message
subjName := sam.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName]
if ok {
log.Printf("info: found the specific subject: %v\n", subjName)
// Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m
} else {
// If a publisher do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present. The process will then be created, and
// the code will loop back to the redo: label.
sub := newSubject(samSlice[i].Subject.Node, samSlice[i].Subject.CommandOrEvent, samSlice[i].Subject.Method)
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
redo:
m := sam.Message
subjName := sam.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName]
if ok {
log.Printf("info: found the specific subject: %v\n", subjName)
// Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m
} else {
// If a publisher do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
time.Sleep(time.Millisecond * 500)
s.printProcessesMap()
goto redo
}
sub := newSubject(sam.Subject.Node, sam.Subject.CommandOrEvent, sam.Subject.Method)
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
time.Sleep(time.Millisecond * 500)
s.printProcessesMap()
// Now when the process is spawned we jump back to the redo: label.
goto redo
}
}
}()

49
ringbuffer.go Normal file
View file

@ -0,0 +1,49 @@
// Info: The idea about the ring buffer is that we have a FIFO
// buffer where we store all incomming messages requested by
// operators. Each message processed will also be stored in a DB.
//
// Idea: All incomming messages should be handled from the in-memory
// buffered channel, but when they are put on the buffer they should
// also be written to the DB with a handled flag set to false.
// When a message have left the buffer the handled flag should be
// set to true.
package steward
import "fmt"
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
buf chan subjectAndMessage
}
// newringBuffer is a push/pop storage for values.
func newringBuffer(size int) *ringBuffer {
return &ringBuffer{
buf: make(chan subjectAndMessage, size),
}
}
// start will process incomming messages through the inCh,
// and deliver messages out when requested on the outCh.
func (s *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
// Fill the buffer when new data arrives
go func() {
for v := range inCh {
s.buf <- v
fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v)
}
close(s.buf)
}()
// Empty the buffer when data asked for
go func() {
for v := range s.buf {
outCh <- v
}
close(outCh)
}()
}

54
tmp1/main.go Normal file
View file

@ -0,0 +1,54 @@
package main
import "fmt"
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
data chan string
}
// newringBuffer is a push/pop storage for values.
func newringBuffer() *ringBuffer {
return &ringBuffer{
data: make(chan string, 10),
}
}
// startWithChannels
func (s *ringBuffer) startWithChannels(inCh chan string, outCh chan string) {
// Fill the buffer when new data arrives
go func() {
for v := range inCh {
s.data <- v
fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v)
}
close(s.data)
}()
go func() {
for v := range s.data {
outCh <- v
}
close(outCh)
}()
}
func main() {
rb := newringBuffer()
inCh := make(chan string)
outCh := make(chan string)
rb.startWithChannels(inCh, outCh)
inCh <- "apekatt"
inCh <- "hest"
inCh <- "ku"
close(inCh)
for v := range outCh {
fmt.Printf("got: %v\n", v)
}
}