From fb0296223167c5204db5d7e0473de03e0ef4b1db Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 12 Feb 2021 11:21:51 +0100 Subject: [PATCH] put in a ring buffer for new messages --- erroringbuffer.go | 28 --------- example-inmessage/orig-ship1.json | 8 +++ publisher.go | 97 +++++++++++++++++++------------ ringbuffer.go | 49 ++++++++++++++++ tmp1/main.go | 54 +++++++++++++++++ 5 files changed, 170 insertions(+), 66 deletions(-) delete mode 100644 erroringbuffer.go create mode 100644 ringbuffer.go create mode 100644 tmp1/main.go diff --git a/erroringbuffer.go b/erroringbuffer.go deleted file mode 100644 index 745f6a9..0000000 --- a/erroringbuffer.go +++ /dev/null @@ -1,28 +0,0 @@ -package steward - -// // ringBuffer holds the data of the buffer, -// type ringBuffer struct { -// data []string -// } -// -// // newringBuffer is a push/pop storage for values. -// func newringBuffer() *ringBuffer { -// return &ringBuffer{} -// } -// -// // push will add another item to the end of the buffer with a normal append -// func (s *ringBuffer) push(d string) { -// s.data = append(s.data, d) -// } -// -// // pop will remove and return the first element of the buffer -// func (s *ringBuffer) pop() string { -// if len(s.data) == 0 { -// return "" -// } -// -// v := s.data[0] -// s.data = append(s.data[0:0], s.data[1:]...) -// -// return v -// } diff --git a/example-inmessage/orig-ship1.json b/example-inmessage/orig-ship1.json index a69ce69..9dea9c6 100644 --- a/example-inmessage/orig-ship1.json +++ b/example-inmessage/orig-ship1.json @@ -6,5 +6,13 @@ "commandOrEvent":"command", "method":"shellCommand" + }, + { + + "toNode": "ship1", + "data": ["bash","-c","ls -l ../"], + "commandOrEvent":"command", + "method":"shellCommand" + } ] \ No newline at end of file diff --git a/publisher.go b/publisher.go index 5bab5c0..f193df2 100644 --- a/publisher.go +++ b/publisher.go @@ -136,50 +136,71 @@ func (s *server) printProcessesMap() { // handleNewOperatorMessages will handle all the new operator messages // given to the system, and route them to the correct subject queue. func (s *server) handleMessagesToPublish() { - // Process the messages that have been received on the incomming - // message pipe. Check and send if there are a specific subject - // for it, and no subject exist throw an error. + // Prepare and start a new ring buffer + const bufferSize int = 100 + rb := newringBuffer(bufferSize) + inCh := make(chan subjectAndMessage) + outCh := make(chan subjectAndMessage) + rb.start(inCh, outCh) + + // Start reading new messages received on the incomming message + // pipe requested by operator, and fill them into the buffer. go func() { for samSlice := range s.newMessagesCh { - for i, sam := range samSlice { - // Check if the format of the message is correct. - // TODO: Send a message to the error kernel here that - // it was unable to process the message with the reason - // why ? - if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - continue - } - if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) { - continue - } + fmt.Println("***** DEBUG ranging samSlice") + for _, sam := range samSlice { + fmt.Println("***** DEBUG putting on channel") + inCh <- sam + fmt.Println("***** DEBUG done putting on channel") + } + } + close(inCh) + }() - // Adding a label here so we are able to redo the sending - // of the last message if a process with specified subject - // is not present. + // Process the messages that are in the ring buffer. Check and + // send if there are a specific subject for it, and no subject + // exist throw an error. + go func() { + for sam := range outCh { + // Check if the format of the message is correct. + // TODO: Send a message to the error kernel here that + // it was unable to process the message with the reason + // why ? + if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok { + continue + } + if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) { + continue + } - redo: - m := sam.Message - subjName := sam.Subject.name() - fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) - _, ok := s.processes[subjName] - if ok { - log.Printf("info: found the specific subject: %v\n", subjName) - // Put the message on the correct process's messageCh - s.processes[subjName].subject.messageCh <- m - } else { - // If a publisher do not exist for the given subject, create it, and - // by using the goto at the end redo the process for this specific message. - log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName) + // Adding a label here so we are able to redo the sending + // of the last message if a process with specified subject + // is not present. The process will then be created, and + // the code will loop back to the redo: label. - sub := newSubject(samSlice[i].Subject.Node, samSlice[i].Subject.CommandOrEvent, samSlice[i].Subject.Method) - proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) - // fmt.Printf("*** %#v\n", proc) - go s.processSpawnWorker(proc) + redo: + m := sam.Message + subjName := sam.Subject.name() + fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) + _, ok := s.processes[subjName] + if ok { + log.Printf("info: found the specific subject: %v\n", subjName) + // Put the message on the correct process's messageCh + s.processes[subjName].subject.messageCh <- m + } else { + // If a publisher do not exist for the given subject, create it, and + // by using the goto at the end redo the process for this specific message. + log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName) - time.Sleep(time.Millisecond * 500) - s.printProcessesMap() - goto redo - } + sub := newSubject(sam.Subject.Node, sam.Subject.CommandOrEvent, sam.Subject.Method) + proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) + // fmt.Printf("*** %#v\n", proc) + go s.processSpawnWorker(proc) + + time.Sleep(time.Millisecond * 500) + s.printProcessesMap() + // Now when the process is spawned we jump back to the redo: label. + goto redo } } }() diff --git a/ringbuffer.go b/ringbuffer.go new file mode 100644 index 0000000..96c52cb --- /dev/null +++ b/ringbuffer.go @@ -0,0 +1,49 @@ +// Info: The idea about the ring buffer is that we have a FIFO +// buffer where we store all incomming messages requested by +// operators. Each message processed will also be stored in a DB. +// +// Idea: All incomming messages should be handled from the in-memory +// buffered channel, but when they are put on the buffer they should +// also be written to the DB with a handled flag set to false. +// When a message have left the buffer the handled flag should be +// set to true. +package steward + +import "fmt" + +// ringBuffer holds the data of the buffer, +type ringBuffer struct { + buf chan subjectAndMessage +} + +// newringBuffer is a push/pop storage for values. +func newringBuffer(size int) *ringBuffer { + return &ringBuffer{ + buf: make(chan subjectAndMessage, size), + } +} + +// start will process incomming messages through the inCh, +// and deliver messages out when requested on the outCh. +func (s *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) { + // Starting both writing and reading in separate go routines so we + // can write and read concurrently. + + // Fill the buffer when new data arrives + go func() { + for v := range inCh { + s.buf <- v + fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v) + } + close(s.buf) + }() + + // Empty the buffer when data asked for + go func() { + for v := range s.buf { + outCh <- v + } + + close(outCh) + }() +} diff --git a/tmp1/main.go b/tmp1/main.go new file mode 100644 index 0000000..0ed62c8 --- /dev/null +++ b/tmp1/main.go @@ -0,0 +1,54 @@ +package main + +import "fmt" + +// ringBuffer holds the data of the buffer, +type ringBuffer struct { + data chan string +} + +// newringBuffer is a push/pop storage for values. +func newringBuffer() *ringBuffer { + return &ringBuffer{ + data: make(chan string, 10), + } +} + +// startWithChannels +func (s *ringBuffer) startWithChannels(inCh chan string, outCh chan string) { + // Fill the buffer when new data arrives + go func() { + for v := range inCh { + s.data <- v + fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v) + } + close(s.data) + }() + + go func() { + for v := range s.data { + outCh <- v + } + + close(outCh) + + }() +} + +func main() { + rb := newringBuffer() + inCh := make(chan string) + outCh := make(chan string) + + rb.startWithChannels(inCh, outCh) + + inCh <- "apekatt" + inCh <- "hest" + inCh <- "ku" + close(inCh) + + for v := range outCh { + fmt.Printf("got: %v\n", v) + } + +}