diff --git a/errorkernel.go b/errorkernel.go index 110e709..9fbdc6d 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -68,7 +68,7 @@ const logNone logLevel = "none" // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. -func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error { +func (e *errorKernel) start(ringBufferBulkInCh chan<- Message) error { // Initiate the slog logger. var replaceFunc func(groups []string, a slog.Attr) slog.Attr if !e.configuration.LogConsoleTimestamps { @@ -131,13 +131,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error { Retries: errEvent.process.configuration.ErrorMessageRetries, } - sam := subjectAndMessage{ - Subject: newSubject(ErrorLog, "errorCentral"), - Message: m, - } - // Put the message on the channel to the ringbuffer. - ringBufferBulkInCh <- sam + ringBufferBulkInCh <- m // if errEvent.process.configuration.EnableDebug { // log.Printf("%v\n", er) diff --git a/message_readers.go b/message_readers.go index 223d6a7..2948124 100644 --- a/message_readers.go +++ b/message_readers.go @@ -77,7 +77,7 @@ func (s *server) readStartupFolder() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: startup folder: malformed json read: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) @@ -85,38 +85,35 @@ func (s *server) readStartupFolder() { } // Check if fromNode field is specified, and remove the message if blank. - for i := range sams { + for i := range messages { // We want to allow the use of nodeName local only in startup folder, and // if used we substite it for the local node name. - if sams[i].Message.ToNode == "local" { - sams[i].Message.ToNode = Node(s.nodeName) - sams[i].Subject.ToNode = s.nodeName + if messages[i].ToNode == "local" { + messages[i].ToNode = Node(s.nodeName) } switch { - case sams[i].Message.FromNode == "": - // Remove the first message from the slice. - sams = append(sams[:i], sams[i+1:]...) + case messages[i].FromNode == "": er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message") s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + continue - case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0: - // Remove the first message from the slice. - sams = append(sams[:i], sams[i+1:]...) + case messages[i].ToNode == "" && len(messages[i].ToNodes) == 0: er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message") s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + continue } } - j, err := json.MarshalIndent(sams, "", " ") + j, err := json.MarshalIndent(messages, "", " ") if err != nil { log.Printf("test error: %v\n", err) } er = fmt.Errorf("%v", string(j)) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) - s.messageDeliverLocalCh <- sams + s.messageDeliverLocalCh <- messages } @@ -212,15 +209,7 @@ func (s *server) jetstreamConsume() { // nodeName of the consumer in the ctrl Message, so we are sure it is handled locally. m.ToNode = Node(s.nodeName) - sam, err := newSubjectAndMessage(m) - if err != nil { - er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er, logError) - return - } - - // If a message is received via - s.messageDeliverLocalCh <- []subjectAndMessage{sam} + s.messageDeliverLocalCh <- []Message{m} }) defer consumeContext.Stop() @@ -300,30 +289,30 @@ func (s *server) readSocket() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) + messages[i].FromNode = Node(s.nodeName) // Send an info message to the central about the message picked // for auditing. - er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) + er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, messages[i]) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) - s.newMessagesCh <- sams[i] + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages }(conn) } @@ -384,42 +373,42 @@ func (s *server) readFolder() { b = bytes.Trim(b, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(b) + messages, err := s.convertBytesToMessages(b) if err != nil { er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) + messages[i].FromNode = Node(s.nodeName) // Send an info message to the central about the message picked // for auditing. - er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message) + er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, messages[i]) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) // Check if it is a message to publish with Jetstream. - if sams[i].Message.JetstreamToNode != "" { + if messages[i].JetstreamToNode != "" { - s.jetstreamPublishCh <- sams[i].Message - er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams) + s.jetstreamPublishCh <- messages[i] + er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", messages) s.errorKernel.logDebug(er) continue } - s.newMessagesCh <- sams[i] + s.newMessagesCh <- messages[i] - er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) + er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", messages) s.errorKernel.logDebug(er) } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages // Delete the file. err = os.Remove(event.Name) @@ -496,23 +485,23 @@ func (s *server) readTCPListener() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on tcp listener: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) - s.newMessagesCh <- sams[i] + messages[i].FromNode = Node(s.nodeName) + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages }(conn) } @@ -541,23 +530,23 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) - s.newMessagesCh <- sams[i] + messages[i].FromNode = Node(s.nodeName) + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages } @@ -595,7 +584,7 @@ type subjectAndMessage struct { // json format. For each element found the Message type will be converted into // a SubjectAndMessage type value and appended to a slice, and the slice is // returned to the caller. -func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { +func (s *server) convertBytesToMessages(b []byte) ([]Message, error) { MsgSlice := []Message{} err := yaml.Unmarshal(b, &MsgSlice) @@ -607,22 +596,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { MsgSlice = s.checkMessageToNodes(MsgSlice) s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice))) - sam := []subjectAndMessage{} - - // Range over all the messages parsed from json, and create a subject for - // each message. - for _, m := range MsgSlice { - sm, err := newSubjectAndMessage(m) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage: %v", err) - s.errorKernel.errSend(s.processInitial, m, er, logWarning) - - continue - } - sam = append(sam, sm) - } - - return sam, nil + return MsgSlice, nil } // checkMessageToNodes will check that either toHost or toHosts are diff --git a/process.go b/process.go index 7b7095c..53ef14f 100644 --- a/process.go +++ b/process.go @@ -81,7 +81,7 @@ type process struct { // copy of the configuration from server configuration *Configuration // The new messages channel copied from *Server - newMessagesCh chan<- subjectAndMessage + newMessagesCh chan<- Message // The structure who holds all processes information processes *processes // nats connection diff --git a/processes.go b/processes.go index 8edd819..57adb7d 100644 --- a/processes.go +++ b/processes.go @@ -185,13 +185,7 @@ func (p *processes) Start(proc process) { Retries: 1, } - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: ProcessesStart: %v", err) - p.errorKernel.errSend(proc, m, er, logError) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: @@ -236,12 +230,7 @@ func (p *processes) Start(proc process) { } proc.nodeAuth.publicKeys.mu.Unlock() - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(proc, m, err, logError) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: @@ -284,13 +273,7 @@ func (p *processes) Start(proc process) { } proc.nodeAuth.nodeAcl.mu.Unlock() - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(proc, m, err, logError) - log.Printf("error: ProcessesStart: %v\n", err) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: diff --git a/requests.go b/requests.go index 9fde69c..730e2a6 100644 --- a/requests.go +++ b/requests.go @@ -352,14 +352,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { PreviousMessage: &thisMsg, } - sam, err := newSubjectAndMessage(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logError) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- newMsg } // selectFileNaming will figure out the correct naming of the file diff --git a/requests_copy.go b/requests_copy.go index 1085a49..a759fbd 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -86,13 +86,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // we should forward the message to that specified toNode. This will allow // us to initiate a file copy from another node to this node. if message.ToNode != proc.node { - sam, err := newSubjectAndMessage(message) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- message return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) } @@ -260,14 +255,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // msg.Directory = dstDir // msg.FileName = dstFile - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - cancel() - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName) @@ -558,15 +546,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel ReplyRetries: initialMessage.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg resendRetries = 0 @@ -628,15 +608,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel ReplyRetries: initialMessage.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg resendRetries++ @@ -692,14 +664,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg } // Open a tmp folder for where to write the received chunks @@ -794,14 +759,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg case copyResendLast: // The csa already contains copyStatus copyResendLast when reached here, @@ -827,14 +785,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg case copySrcDone: err := func() error { @@ -980,14 +931,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg } cancel() diff --git a/requests_keys.go b/requests_keys.go index 6df557a..fea983a 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -323,14 +323,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { ACKTimeout: 0, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) proc.errorKernel.logDebug(er) @@ -370,14 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { ACKTimeout: 0, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode) proc.errorKernel.logDebug(er) diff --git a/requests_test.go b/requests_test.go index ece76f7..652b9a3 100644 --- a/requests_test.go +++ b/requests_test.go @@ -285,12 +285,7 @@ func TestRequest(t *testing.T) { for _, tt := range tests { switch tt.viaSocketOrCh { case viaCh: - sam, err := newSubjectAndMessage(tt.message) - if err != nil { - t.Fatalf("newSubjectAndMessage failed: %v\n", err) - } - - tstSrv.newMessagesCh <- sam + tstSrv.newMessagesCh <- tt.message case viaSocket: msgs := []Message{tt.message} @@ -339,6 +334,7 @@ func TestRequest(t *testing.T) { checkREQTailFileTest(tstConf, t, tstTempDir) checkMetricValuesTest(tstSrv, t) checkErrorKernelMalformedJSONtest(tstConf, t) + t.Log("*******starting with checkREQCopySrc\n") checkREQCopySrc(tstConf, t, tstTempDir) } diff --git a/server.go b/server.go index f1c4a6d..4642882 100644 --- a/server.go +++ b/server.go @@ -50,9 +50,9 @@ type server struct { // // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. - newMessagesCh chan subjectAndMessage + newMessagesCh chan Message // messageDeliverLocalCh - messageDeliverLocalCh chan []subjectAndMessage + messageDeliverLocalCh chan []Message // Channel for messages to publish with Jetstream. jetstreamPublishCh chan Message // errorKernel is doing all the error handling like what to do if @@ -76,7 +76,7 @@ type server struct { // message ID messageID messageID // audit logging - auditLogCh chan []subjectAndMessage + auditLogCh chan []Message zstdEncoder *zstd.Encoder } @@ -234,8 +234,8 @@ func NewServer(configuration *Configuration, version string) (*server, error) { nodeName: configuration.NodeName, natsConn: conn, ctrlSocket: ctrlSocket, - newMessagesCh: make(chan subjectAndMessage), - messageDeliverLocalCh: make(chan []subjectAndMessage), + newMessagesCh: make(chan Message), + messageDeliverLocalCh: make(chan []Message), jetstreamPublishCh: make(chan Message), metrics: metrics, version: version, @@ -243,7 +243,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { nodeAuth: nodeAuth, helloRegister: newHelloRegister(), centralAuth: centralAuth, - auditLogCh: make(chan []subjectAndMessage), + auditLogCh: make(chan []Message), zstdEncoder: zstdEncoder, } @@ -397,11 +397,11 @@ func (s *server) startAuditLog(ctx context.Context) { for { select { - case sams := <-s.auditLogCh: + case messages := <-s.auditLogCh: - for _, sam := range sams { + for _, message := range messages { msgForPermStore := Message{} - copier.Copy(&msgForPermStore, sam.Message) + copier.Copy(&msgForPermStore, message) // Remove the content of the data field. msgForPermStore.Data = nil @@ -433,27 +433,29 @@ func (s *server) directSAMSChRead() { case <-s.ctx.Done(): log.Printf("info: stopped the directSAMSCh reader\n\n") return - case sams := <-s.messageDeliverLocalCh: + case messages := <-s.messageDeliverLocalCh: // fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams) // Range over all the sams, find the process, check if the method exists, and // handle the message by starting the correct method handler. - for i := range sams { - processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) + for i := range messages { + // TODO: !!!!!! Shoud the node here be the fromNode ??????? + subject := newSubject(messages[i].Method, string(messages[i].ToNode)) + processName := processNameGet(subject.name(), processKindSubscriber) s.processes.active.mu.Lock() p := s.processes.active.procNames[processName] s.processes.active.mu.Unlock() - mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) + mh, ok := p.methodsAvailable.CheckIfExists(messages[i].Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Method) - p.errorKernel.errSend(p, sams[i].Message, er, logError) + p.errorKernel.errSend(p, messages[i], er, logError) continue } p.handler = mh - go executeHandler(p, sams[i].Message, s.nodeName) + go executeHandler(p, messages[i], s.nodeName) } } } @@ -508,8 +510,7 @@ func (s *server) routeMessagesToPublisherProcess() { methodsAvailable := method.GetMethodsAvailable() go func() { - for sam1 := range s.newMessagesCh { - message := sam1.Message + for message := range s.newMessagesCh { go func(message Message) {