From 425553db57c970c52b8d458eaa1fad5406129204 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sun, 24 Nov 2024 22:04:08 +0100 Subject: [PATCH] debugging print, and removed subject filter from consumer --- message_readers.go | 13 ++++++++++++- processes.go | 25 +++++++++++++------------ server.go | 3 +++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/message_readers.go b/message_readers.go index 4e80b43..35ae88d 100644 --- a/message_readers.go +++ b/message_readers.go @@ -267,10 +267,17 @@ func (s *server) readFolder() { } fh.Close() + if len(b) == 0 { + return + } + + fmt.Printf("!!!!!!!DEBUG: readfolder: 1: readbytes: %v\n", b) b = bytes.Trim(b, "\x00") // unmarshal the JSON into a struct + fmt.Printf("!!!!!!!DEBUG: readfolder: 2: readbytes: %v\n", b) sams, err := s.convertBytesToSAMs(b) + fmt.Printf("!!!!!!!DEBUG: readfolder: done convertBytesToSAMs, got sams: %v\n", sams) if err != nil { er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) @@ -289,12 +296,16 @@ func (s *server) readFolder() { s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) } - er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) + er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.newMessagesCh: %#v", sams) s.errorKernel.logDebug(er) // Send the SAM struct to be picked up by the ring buffer. + fmt.Print("!!!!!!!DEBUG: readfolder: Before putting on newMessagesCh\n") s.newMessagesCh <- sams + fmt.Print("!!!!!!!DEBUG: readfolder: After putting on newMessagesCh\n") + fmt.Print("!!!!!!!DEBUG: readfolder: Before putting on auditLogCh\n") s.auditLogCh <- sams + fmt.Print("!!!!!!!DEBUG: readfolder: After putting on auditLogCh\n") // Delete the file. err = os.Remove(event.Name) diff --git a/processes.go b/processes.go index e7c450f..6e187ed 100644 --- a/processes.go +++ b/processes.go @@ -380,12 +380,12 @@ func (p *processes) Start(proc process) { } subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode) - fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###") + fmt.Printf("######## DEBUG: Publisher: before publish on subject: %v\n", subject) _, err = js.Publish(proc.ctx, subject, b) if err != nil { log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err) } - fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###") + fmt.Printf("######## DEBUG: Publisher: after publish on subject: %v\n", subject) case <-ctx.Done(): return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done") @@ -433,11 +433,11 @@ func (p *processes) Start(proc process) { // Check for more subjects via flags to listen to, and if defined prefix all // the values with "nodes." - filterSubjectValues := []string{ - fmt.Sprintf("nodes.%v", proc.server.nodeName), - //"nodes.all", - } - fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) + ////filterSubjectValues := []string{ + //// fmt.Sprintf("nodes.%v", proc.server.nodeName), + //// //"nodes.all", + ////} + ////fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) //// Check if there are more to consume defined in flags/env. //if proc.configuration.JetstreamsConsume != "" { @@ -448,16 +448,17 @@ func (p *processes) Start(proc process) { //} consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ - Name: "nodes_processor", - Durable: "nodes_processor", - FilterSubjects: filterSubjectValues, + Name: "nodes_processor", + Durable: "nodes_processor", + //FilterSubjects: filterSubjectValues, }) if err != nil { log.Fatalf("error: create or update consumer failed: %v\n", err) } - consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) - fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo) + consumerInfo, _ := consumer.Info(proc.ctx) + fmt.Printf("--- DEBUG: consumer: consumerInfo: %v\n", consumerInfo) + //fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo) cctx, err := consumer.Consume(func(msg jetstream.Msg) { fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg) diff --git a/server.go b/server.go index 5a48c22..1a8e5f6 100644 --- a/server.go +++ b/server.go @@ -520,8 +520,11 @@ func (s *server) routeMessagesToProcess() { // If the message have the JetstreamToNode field specified // deliver it via the jet stream processes, and abort trying // to send it via the normal nats publisher. + fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess: checking if it is a jetstram message: %v\n", sam.Message.JetstreamToNode) if sam.Message.JetstreamToNode != "" { + fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess: it was a jetstram message, putting it on jetstreamOutCh: %v\n", sam.Message.JetstreamToNode) s.jetstreamOutCh <- sam.Message + fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess:done putting it on jetstreamOutCh: %v\n", sam.Message.JetstreamToNode) continue }