mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
renamed channel
This commit is contained in:
parent
a56321d61e
commit
a042b51672
2 changed files with 13 additions and 12 deletions
|
@ -46,12 +46,12 @@ type ringBuffer struct {
|
|||
// Name of node.
|
||||
nodeName Node
|
||||
// New messages to the system to be put into the ring buffer.
|
||||
newMessagesCh chan []subjectAndMessage
|
||||
metrics *metrics
|
||||
toRingbuffer chan []subjectAndMessage
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
// newringBuffer returns a push/pop storage for values.
|
||||
func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||
func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, toRingbuffer chan []subjectAndMessage) *ringBuffer {
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.DatabaseFolder, 0700)
|
||||
|
@ -91,12 +91,12 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin
|
|||
metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent)
|
||||
|
||||
return &ringBuffer{
|
||||
bufData: make(chan samDBValue, size),
|
||||
db: db,
|
||||
permStore: make(chan string),
|
||||
nodeName: nodeName,
|
||||
newMessagesCh: newMessagesCh,
|
||||
metrics: metrics,
|
||||
bufData: make(chan samDBValue, size),
|
||||
db: db,
|
||||
permStore: make(chan string),
|
||||
nodeName: nodeName,
|
||||
toRingbuffer: toRingbuffer,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,7 +162,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
|||
// Check if the command or event exists in commandOrEvent.go
|
||||
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
||||
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
|
||||
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||
sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er)
|
||||
|
||||
fmt.Println()
|
||||
// if it was not a valid value, we jump back up, and
|
||||
|
@ -195,14 +195,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
|||
js, err := json.Marshal(samV)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
|
||||
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||
sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er)
|
||||
}
|
||||
|
||||
// Store the incomming message in key/value store
|
||||
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
||||
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||
sendErrorLogMessage(r.toRingbuffer, Node(r.nodeName), er)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -331,6 +331,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
|
|||
// Prepare and start a new ring buffer
|
||||
const bufferSize int = 1000
|
||||
rb := newringBuffer(s.metrics, *s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh)
|
||||
// TODO:
|
||||
inCh := make(chan subjectAndMessage)
|
||||
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
||||
// start the ringbuffer.
|
||||
|
|
Loading…
Reference in a new issue