1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed sams from channels

This commit is contained in:
postmannen 2024-12-01 03:08:40 +01:00
parent 9fca6d0b7f
commit 49be8947fa
9 changed files with 76 additions and 204 deletions

View file

@ -68,7 +68,7 @@ const logNone logLevel = "none"
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error {
func (e *errorKernel) start(ringBufferBulkInCh chan<- Message) error {
// Initiate the slog logger.
var replaceFunc func(groups []string, a slog.Attr) slog.Attr
if !e.configuration.LogConsoleTimestamps {
@ -131,13 +131,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error {
Retries: errEvent.process.configuration.ErrorMessageRetries,
}
sam := subjectAndMessage{
Subject: newSubject(ErrorLog, "errorCentral"),
Message: m,
}
// Put the message on the channel to the ringbuffer.
ringBufferBulkInCh <- sam
ringBufferBulkInCh <- m
// if errEvent.process.configuration.EnableDebug {
// log.Printf("%v\n", er)

View file

@ -77,7 +77,7 @@ func (s *server) readStartupFolder() {
readBytes = bytes.Trim(readBytes, "\x00")
// unmarshal the JSON into a struct
sams, err := s.convertBytesToSAMs(readBytes)
messages, err := s.convertBytesToMessages(readBytes)
if err != nil {
er := fmt.Errorf("error: startup folder: malformed json read: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
@ -85,38 +85,35 @@ func (s *server) readStartupFolder() {
}
// Check if fromNode field is specified, and remove the message if blank.
for i := range sams {
for i := range messages {
// We want to allow the use of nodeName local only in startup folder, and
// if used we substite it for the local node name.
if sams[i].Message.ToNode == "local" {
sams[i].Message.ToNode = Node(s.nodeName)
sams[i].Subject.ToNode = s.nodeName
if messages[i].ToNode == "local" {
messages[i].ToNode = Node(s.nodeName)
}
switch {
case sams[i].Message.FromNode == "":
// Remove the first message from the slice.
sams = append(sams[:i], sams[i+1:]...)
case messages[i].FromNode == "":
er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message")
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
continue
case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0:
// Remove the first message from the slice.
sams = append(sams[:i], sams[i+1:]...)
case messages[i].ToNode == "" && len(messages[i].ToNodes) == 0:
er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message")
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
continue
}
}
j, err := json.MarshalIndent(sams, "", " ")
j, err := json.MarshalIndent(messages, "", " ")
if err != nil {
log.Printf("test error: %v\n", err)
}
er = fmt.Errorf("%v", string(j))
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
s.messageDeliverLocalCh <- sams
s.messageDeliverLocalCh <- messages
}
@ -212,15 +209,7 @@ func (s *server) jetstreamConsume() {
// nodeName of the consumer in the ctrl Message, so we are sure it is handled locally.
m.ToNode = Node(s.nodeName)
sam, err := newSubjectAndMessage(m)
if err != nil {
er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
return
}
// If a message is received via
s.messageDeliverLocalCh <- []subjectAndMessage{sam}
s.messageDeliverLocalCh <- []Message{m}
})
defer consumeContext.Stop()
@ -300,30 +289,30 @@ func (s *server) readSocket() {
readBytes = bytes.Trim(readBytes, "\x00")
// unmarshal the JSON into a struct
sams, err := s.convertBytesToSAMs(readBytes)
messages, err := s.convertBytesToMessages(readBytes)
if err != nil {
er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
return
}
for i := range sams {
for i := range messages {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
messages[i].FromNode = Node(s.nodeName)
// Send an info message to the central about the message picked
// for auditing.
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, messages[i])
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
s.newMessagesCh <- sams[i]
s.newMessagesCh <- messages[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.auditLogCh <- sams
s.auditLogCh <- messages
}(conn)
}
@ -384,42 +373,42 @@ func (s *server) readFolder() {
b = bytes.Trim(b, "\x00")
// unmarshal the JSON into a struct
sams, err := s.convertBytesToSAMs(b)
messages, err := s.convertBytesToMessages(b)
if err != nil {
er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
return
}
for i := range sams {
for i := range messages {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
messages[i].FromNode = Node(s.nodeName)
// Send an info message to the central about the message picked
// for auditing.
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message)
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, messages[i])
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
// Check if it is a message to publish with Jetstream.
if sams[i].Message.JetstreamToNode != "" {
if messages[i].JetstreamToNode != "" {
s.jetstreamPublishCh <- sams[i].Message
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams)
s.jetstreamPublishCh <- messages[i]
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", messages)
s.errorKernel.logDebug(er)
continue
}
s.newMessagesCh <- sams[i]
s.newMessagesCh <- messages[i]
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", messages)
s.errorKernel.logDebug(er)
}
// Send the SAM struct to be picked up by the ring buffer.
s.auditLogCh <- sams
s.auditLogCh <- messages
// Delete the file.
err = os.Remove(event.Name)
@ -496,23 +485,23 @@ func (s *server) readTCPListener() {
readBytes = bytes.Trim(readBytes, "\x00")
// unmarshal the JSON into a struct
sams, err := s.convertBytesToSAMs(readBytes)
messages, err := s.convertBytesToMessages(readBytes)
if err != nil {
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
return
}
for i := range sams {
for i := range messages {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
s.newMessagesCh <- sams[i]
messages[i].FromNode = Node(s.nodeName)
s.newMessagesCh <- messages[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.auditLogCh <- sams
s.auditLogCh <- messages
}(conn)
}
@ -541,23 +530,23 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
readBytes = bytes.Trim(readBytes, "\x00")
// unmarshal the JSON into a struct
sams, err := s.convertBytesToSAMs(readBytes)
messages, err := s.convertBytesToMessages(readBytes)
if err != nil {
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
return
}
for i := range sams {
for i := range messages {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
s.newMessagesCh <- sams[i]
messages[i].FromNode = Node(s.nodeName)
s.newMessagesCh <- messages[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.auditLogCh <- sams
s.auditLogCh <- messages
}
@ -595,7 +584,7 @@ type subjectAndMessage struct {
// json format. For each element found the Message type will be converted into
// a SubjectAndMessage type value and appended to a slice, and the slice is
// returned to the caller.
func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
func (s *server) convertBytesToMessages(b []byte) ([]Message, error) {
MsgSlice := []Message{}
err := yaml.Unmarshal(b, &MsgSlice)
@ -607,22 +596,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
MsgSlice = s.checkMessageToNodes(MsgSlice)
s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice)))
sam := []subjectAndMessage{}
// Range over all the messages parsed from json, and create a subject for
// each message.
for _, m := range MsgSlice {
sm, err := newSubjectAndMessage(m)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage: %v", err)
s.errorKernel.errSend(s.processInitial, m, er, logWarning)
continue
}
sam = append(sam, sm)
}
return sam, nil
return MsgSlice, nil
}
// checkMessageToNodes will check that either toHost or toHosts are

View file

@ -81,7 +81,7 @@ type process struct {
// copy of the configuration from server
configuration *Configuration
// The new messages channel copied from *Server
newMessagesCh chan<- subjectAndMessage
newMessagesCh chan<- Message
// The structure who holds all processes information
processes *processes
// nats connection

View file

@ -185,13 +185,7 @@ func (p *processes) Start(proc process) {
Retries: 1,
}
sam, err := newSubjectAndMessage(m)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: ProcessesStart: %v", err)
p.errorKernel.errSend(proc, m, er, logError)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- m
select {
case <-ticker.C:
@ -236,12 +230,7 @@ func (p *processes) Start(proc process) {
}
proc.nodeAuth.publicKeys.mu.Unlock()
sam, err := newSubjectAndMessage(m)
if err != nil {
// In theory the system should drop the message before it reaches here.
p.errorKernel.errSend(proc, m, err, logError)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- m
select {
case <-ticker.C:
@ -284,13 +273,7 @@ func (p *processes) Start(proc process) {
}
proc.nodeAuth.nodeAcl.mu.Unlock()
sam, err := newSubjectAndMessage(m)
if err != nil {
// In theory the system should drop the message before it reaches here.
p.errorKernel.errSend(proc, m, err, logError)
log.Printf("error: ProcessesStart: %v\n", err)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- m
select {
case <-ticker.C:

View file

@ -352,14 +352,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
PreviousMessage: &thisMsg,
}
sam, err := newSubjectAndMessage(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er, logError)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- newMsg
}
// selectFileNaming will figure out the correct naming of the file

View file

@ -86,13 +86,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// we should forward the message to that specified toNode. This will allow
// us to initiate a file copy from another node to this node.
if message.ToNode != proc.node {
sam, err := newSubjectAndMessage(message)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- message
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
}
@ -260,14 +255,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// msg.Directory = dstDir
// msg.FileName = dstFile
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
cancel()
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
@ -558,15 +546,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
ReplyRetries: initialMessage.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
resendRetries = 0
@ -628,15 +608,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
ReplyRetries: initialMessage.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
resendRetries++
@ -692,14 +664,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
ReplyRetries: message.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
}
// Open a tmp folder for where to write the received chunks
@ -794,14 +759,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
ReplyRetries: message.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
case copyResendLast:
// The csa already contains copyStatus copyResendLast when reached here,
@ -827,14 +785,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
ReplyRetries: message.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
case copySrcDone:
err := func() error {
@ -980,14 +931,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
ReplyRetries: message.ReplyRetries,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
}
cancel()

View file

@ -323,14 +323,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
ACKTimeout: 0,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
proc.errorKernel.logDebug(er)
@ -370,14 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
ACKTimeout: 0,
}
sam, err := newSubjectAndMessage(msg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- sam
proc.newMessagesCh <- msg
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
proc.errorKernel.logDebug(er)

View file

@ -285,12 +285,7 @@ func TestRequest(t *testing.T) {
for _, tt := range tests {
switch tt.viaSocketOrCh {
case viaCh:
sam, err := newSubjectAndMessage(tt.message)
if err != nil {
t.Fatalf("newSubjectAndMessage failed: %v\n", err)
}
tstSrv.newMessagesCh <- sam
tstSrv.newMessagesCh <- tt.message
case viaSocket:
msgs := []Message{tt.message}
@ -339,6 +334,7 @@ func TestRequest(t *testing.T) {
checkREQTailFileTest(tstConf, t, tstTempDir)
checkMetricValuesTest(tstSrv, t)
checkErrorKernelMalformedJSONtest(tstConf, t)
t.Log("*******starting with checkREQCopySrc\n")
checkREQCopySrc(tstConf, t, tstTempDir)
}

View file

@ -50,9 +50,9 @@ type server struct {
//
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
newMessagesCh chan subjectAndMessage
newMessagesCh chan Message
// messageDeliverLocalCh
messageDeliverLocalCh chan []subjectAndMessage
messageDeliverLocalCh chan []Message
// Channel for messages to publish with Jetstream.
jetstreamPublishCh chan Message
// errorKernel is doing all the error handling like what to do if
@ -76,7 +76,7 @@ type server struct {
// message ID
messageID messageID
// audit logging
auditLogCh chan []subjectAndMessage
auditLogCh chan []Message
zstdEncoder *zstd.Encoder
}
@ -234,8 +234,8 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
nodeName: configuration.NodeName,
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan subjectAndMessage),
messageDeliverLocalCh: make(chan []subjectAndMessage),
newMessagesCh: make(chan Message),
messageDeliverLocalCh: make(chan []Message),
jetstreamPublishCh: make(chan Message),
metrics: metrics,
version: version,
@ -243,7 +243,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
nodeAuth: nodeAuth,
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
auditLogCh: make(chan []Message),
zstdEncoder: zstdEncoder,
}
@ -397,11 +397,11 @@ func (s *server) startAuditLog(ctx context.Context) {
for {
select {
case sams := <-s.auditLogCh:
case messages := <-s.auditLogCh:
for _, sam := range sams {
for _, message := range messages {
msgForPermStore := Message{}
copier.Copy(&msgForPermStore, sam.Message)
copier.Copy(&msgForPermStore, message)
// Remove the content of the data field.
msgForPermStore.Data = nil
@ -433,27 +433,29 @@ func (s *server) directSAMSChRead() {
case <-s.ctx.Done():
log.Printf("info: stopped the directSAMSCh reader\n\n")
return
case sams := <-s.messageDeliverLocalCh:
case messages := <-s.messageDeliverLocalCh:
// fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams)
// Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler.
for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
for i := range messages {
// TODO: !!!!!! Shoud the node here be the fromNode ???????
subject := newSubject(messages[i].Method, string(messages[i].ToNode))
processName := processNameGet(subject.name(), processKindSubscriber)
s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName]
s.processes.active.mu.Unlock()
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
mh, ok := p.methodsAvailable.CheckIfExists(messages[i].Method)
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Method)
p.errorKernel.errSend(p, sams[i].Message, er, logError)
p.errorKernel.errSend(p, messages[i], er, logError)
continue
}
p.handler = mh
go executeHandler(p, sams[i].Message, s.nodeName)
go executeHandler(p, messages[i], s.nodeName)
}
}
}
@ -508,8 +510,7 @@ func (s *server) routeMessagesToPublisherProcess() {
methodsAvailable := method.GetMethodsAvailable()
go func() {
for sam1 := range s.newMessagesCh {
message := sam1.Message
for message := range s.newMessagesCh {
go func(message Message) {