1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

renamed newMessagesCh to ringBufferBulkInCh

This commit is contained in:
postmannen 2022-01-20 08:17:37 +01:00
parent 698706b41f
commit ccd57dad10
4 changed files with 47 additions and 66 deletions

View file

@ -52,7 +52,7 @@ func newErrorKernel(ctx context.Context, m *metrics) *errorKernel {
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error {
// NOTE: For now it will just print the error messages to the
// console.
@ -104,7 +104,7 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
}
// Put the message on the channel to the ringbuffer.
newMessagesCh <- []subjectAndMessage{sam}
ringBufferBulkInCh <- []subjectAndMessage{sam}
e.metrics.promErrorMessagesSentTotal.Inc()
}()

View file

@ -62,7 +62,7 @@ func (s *server) readSocket() {
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sams
s.ringBufferBulkInCh <- sams
}(conn)
}
@ -127,7 +127,7 @@ func (s *server) readTCPListener() {
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sam
s.ringBufferBulkInCh <- sam
}(conn)
}
@ -171,7 +171,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sam
s.ringBufferBulkInCh <- sam
}

View file

@ -47,18 +47,18 @@ type ringBuffer struct {
permStore chan string
// Name of node.
nodeName Node
// newMessagesCh from *server are also implemented here,
// ringBufferBulkInCh from *server are also implemented here,
// so the ringbuffer can send it's error messages the same
// way as all messages are handled.
newMessagesCh chan []subjectAndMessage
metrics *metrics
configuration *Configuration
errorKernel *errorKernel
processInitial process
ringBufferBulkInCh chan []subjectAndMessage
metrics *metrics
configuration *Configuration
errorKernel *errorKernel
processInitial process
}
// newringBuffer returns a push/pop storage for values.
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
// Check if socket folder exists, if not create it
if _, err := os.Stat(configuration.DatabaseFolder); os.IsNotExist(err) {
@ -80,16 +80,16 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu
}
return &ringBuffer{
bufData: make(chan samDBValue, size),
db: db,
samValueBucket: samValueBucket,
indexValueBucket: indexValueBucket,
permStore: make(chan string),
nodeName: nodeName,
newMessagesCh: newMessagesCh,
metrics: metrics,
configuration: configuration,
processInitial: processInitial,
bufData: make(chan samDBValue, size),
db: db,
samValueBucket: samValueBucket,
indexValueBucket: indexValueBucket,
permStore: make(chan string),
nodeName: nodeName,
ringBufferBulkInCh: ringBufferBulkInCh,
metrics: metrics,
configuration: configuration,
processInitial: processInitial,
}
}
@ -141,7 +141,6 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err)
log.Printf("%v\n", er)
return
//sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
}
for _, v := range s {

View file

@ -40,17 +40,12 @@ type server struct {
processes *processes
// The name of the node
nodeName string
// newMessagesCh are the channel where new messages to be handled
// by the system are put. So if any process want's to send a message
// like an error message you just put the message on the newMessagesCh.
// ringBufferBulkInCh are the channel where new messages in a bulk
// format (slice) are put into the system.
//
// In general the ringbuffer will read this
// channel for messages to put on the buffer.
//
// Example:
// A message is read from the socket, then put on the newMessagesCh
// and then put on the ringbuffer.
newMessagesCh chan []subjectAndMessage
// channel, unfold each slice, and put single messages on the buffer.
ringBufferBulkInCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
@ -142,18 +137,18 @@ func NewServer(c *Configuration, version string) (*server, error) {
}
s := &server{
ctx: ctx,
cancel: cancel,
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
StewardSocket: stewardSocket,
processes: newProcesses(ctx, metrics, tuiClient, errorKernel),
newMessagesCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
tui: tuiClient,
errorKernel: errorKernel,
ctx: ctx,
cancel: cancel,
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
StewardSocket: stewardSocket,
processes: newProcesses(ctx, metrics, tuiClient, errorKernel),
ringBufferBulkInCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
tui: tuiClient,
errorKernel: errorKernel,
}
// Create the default data folder for where subscribers should
@ -214,7 +209,7 @@ func (s *server) Start() {
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
go func() {
err := s.errorKernel.start(s.newMessagesCh)
err := s.errorKernel.start(s.ringBufferBulkInCh)
if err != nil {
log.Printf("%v\n", err)
}
@ -251,7 +246,7 @@ func (s *server) Start() {
//
// NB: The context of the initial process are set in processes.Start.
sub := newSubject(REQInitial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, "", nil)
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil)
// Start all wanted subscriber processes.
s.processes.Start(s.processInitial)
@ -266,7 +261,7 @@ func (s *server) Start() {
if s.configuration.EnableTUI {
go func() {
err := s.tui.Start(s.ctx, s.newMessagesCh)
err := s.tui.Start(s.ctx, s.ringBufferBulkInCh)
if err != nil {
log.Printf("%v\n", err)
os.Exit(1)
@ -308,26 +303,13 @@ func (s *server) Stop() {
}
// sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
//
// Deprecated.
// func sendErrorLogMessage(conf *Configuration, metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// // NB: Adding log statement here for more visuality during development.
// log.Printf("%v\n", theError)
// sam := createErrorMsgContent(conf, FromNode, theError)
// newMessagesCh <- []subjectAndMessage{sam}
//
// metrics.promErrorMessagesSentTotal.Inc()
// }
// sendInfoMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
func sendInfoLogMessage(conf *Configuration, metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// NB: Adding log statement here for more visuality during development.
log.Printf("%v\n", theError)
sam := createErrorMsgContent(conf, FromNode, theError)
newMessagesCh <- []subjectAndMessage{sam}
ringBufferBulkInCh <- []subjectAndMessage{sam}
metrics.promInfoMessagesSentTotal.Inc()
}
@ -366,7 +348,7 @@ type samDBValueAndDelivered struct {
// routeMessagesToProcess takes a database name it's input argument.
// The database will be used as the persistent k/v store for the work
// queue which is implemented as a ring buffer.
// The newMessagesCh are where we get new messages to publish.
// The ringBufferInCh are where we get new messages to publish.
// Incomming messages will be routed to the correct subject process, where
// the handling of each nats subject is handled within it's own separate
// worker process.
@ -378,7 +360,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
const samValueBucket string = "samValueBucket"
const indexValueBucket string = "indexValueBucket"
s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.newMessagesCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial)
s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.ringBufferBulkInCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial)
ringBufferInCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValueAndDelivered)
@ -391,7 +373,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// we loop here, unfold the slice, and put single subjectAndMessages's on
// the channel to the ringbuffer.
go func() {
for sams := range s.newMessagesCh {
for sams := range s.ringBufferBulkInCh {
for _, sam := range sams {
ringBufferInCh <- sam
}
@ -483,7 +465,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
proc.spawnWorker(s.processes, s.natsConn)
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)