From a042b51672435e303c13412b4d42274572b59c92 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 25 Aug 2021 06:38:23 +0200 Subject: [PATCH] renamed channel --- ringbuffer.go | 24 ++++++++++++------------ server.go | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/ringbuffer.go b/ringbuffer.go index 7174661..0edcca0 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -46,12 +46,12 @@ type ringBuffer struct { // Name of node. nodeName Node // New messages to the system to be put into the ring buffer. - newMessagesCh chan []subjectAndMessage - metrics *metrics + toRingbuffer chan []subjectAndMessage + metrics *metrics } // newringBuffer returns a push/pop storage for values. -func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { +func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, toRingbuffer chan []subjectAndMessage) *ringBuffer { // Check if socket folder exists, if not create it if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) { err := os.MkdirAll(c.DatabaseFolder, 0700) @@ -91,12 +91,12 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent) return &ringBuffer{ - bufData: make(chan samDBValue, size), - db: db, - permStore: make(chan string), - nodeName: nodeName, - newMessagesCh: newMessagesCh, - metrics: metrics, + bufData: make(chan samDBValue, size), + db: db, + permStore: make(chan string), + nodeName: nodeName, + toRingbuffer: toRingbuffer, + metrics: metrics, } } @@ -162,7 +162,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri // Check if the command or event exists in commandOrEvent.go if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) { er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject) - sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) + sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er) fmt.Println() // if it was not a valid value, we jump back up, and @@ -195,14 +195,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri js, err := json.Marshal(samV) if err != nil { er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) - sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) + sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er) } // Store the incomming message in key/value store err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js) if err != nil { er := fmt.Errorf("error: dbUpdate samValue failed: %v", err) - sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) + sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er) } diff --git a/server.go b/server.go index 1ab2c04..54731ae 100644 --- a/server.go +++ b/server.go @@ -331,6 +331,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject // Prepare and start a new ring buffer const bufferSize int = 1000 rb := newringBuffer(s.metrics, *s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh) + // TODO: inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValueAndDelivered) // start the ringbuffer.