From 9ea823293c5ccf66345442573369e99c871b1a75 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 25 Nov 2024 18:03:12 +0100 Subject: [PATCH] debug --- process.go | 5 +++++ processes.go | 59 ++++++++++++++++++++++++++++------------------------ server.go | 5 +++++ 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/process.go b/process.go index 3f785ab..7084295 100644 --- a/process.go +++ b/process.go @@ -596,6 +596,11 @@ func (p process) callHandler(message Message, thisNode string) []byte { // executeHandler will call the handler for the Request type defined in the message. func executeHandler(p process, message Message, thisNode string) { var err error + if message.ToNode != "errorCentral" { + fmt.Printf("??????? DEBUG: executeHandler: got message: %v\n", message) + fmt.Printf("??????? DEBUG: executeHandler: got thisNode: %v\n", thisNode) + fmt.Printf("??????? DEBUG: executeHandler: got process: %+v\n", p) + } // Check if it is a message to run scheduled. var interval int diff --git a/processes.go b/processes.go index 6e187ed..c308a30 100644 --- a/processes.go +++ b/processes.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "sync" "time" @@ -369,7 +370,7 @@ func (p *processes) Start(proc process) { // TODO: select { case msg := <-proc.jetstreamOut: - fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg) + fmt.Printf("\n######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg) // b, err := proc.messageSerializeAndCompress(msg) // if err != nil { // log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err) @@ -381,6 +382,7 @@ func (p *processes) Start(proc process) { subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode) fmt.Printf("######## DEBUG: Publisher: before publish on subject: %v\n", subject) + _, err = js.Publish(proc.ctx, subject, b) if err != nil { log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err) @@ -415,42 +417,45 @@ func (p *processes) Start(proc process) { log.Fatalf("error: jetstream new failed: %v\n", err) } - stream, err := js.Stream(proc.ctx, "nodes") - if err != nil { - log.Printf("error: js.Stream failed: %v\n", err) - } + // stream, err := js.Stream(proc.ctx, "nodes") + // if err != nil { + // log.Printf("error: js.Stream failed: %v\n", err) + // } - // stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{ - // Name: "nodes", - // Description: "nodes stream", - // Subjects: []string{"nodes.>"}, - // // Discard older messages and keep only the last one. - // MaxMsgsPerSubject: 1, - // }) + stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{ + Name: "nodes", + Description: "nodes stream", + Subjects: []string{"nodes.>"}, + // Discard older messages and keep only the last one. + MaxMsgsPerSubject: 10, + }) if err != nil { log.Printf("error: js.Stream failed: %v\n", err) } // Check for more subjects via flags to listen to, and if defined prefix all // the values with "nodes." - ////filterSubjectValues := []string{ - //// fmt.Sprintf("nodes.%v", proc.server.nodeName), - //// //"nodes.all", - ////} - ////fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) + fmt.Println("-------------------------------------------------------") + fmt.Printf(" DEBUG: start consumer: proc.server.nodeName: %v\n", proc.server.nodeName) + fmt.Println("-------------------------------------------------------") + filterSubjectValues := []string{ + fmt.Sprintf("nodes.%v", proc.server.nodeName), + //"nodes.all", + } + fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) - //// Check if there are more to consume defined in flags/env. - //if proc.configuration.JetstreamsConsume != "" { - // splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",") - // for i, v := range splitValues { - // filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) - // } - //} + // Check if there are more to consume defined in flags/env. + if proc.configuration.JetstreamsConsume != "" { + splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",") + for i, v := range splitValues { + filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) + } + } consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ - Name: "nodes_processor", - Durable: "nodes_processor", - //FilterSubjects: filterSubjectValues, + Name: "nodes_processor", + Durable: "nodes_processor", + FilterSubjects: filterSubjectValues, }) if err != nil { log.Fatalf("error: create or update consumer failed: %v\n", err) diff --git a/server.go b/server.go index 40847e4..3a6f297 100644 --- a/server.go +++ b/server.go @@ -446,6 +446,11 @@ func (s *server) directSAMSChRead() { for i, sam := range sams { processName := processNameGet(sams[i].Subject.name(), processKindSubscriberNats) + if processName == "" { + fmt.Printf("error: processName was empty, sams[%v] was: %#v\n", i, sams[i]) + os.Exit(1) + } + s.processes.active.mu.Lock() p := s.processes.active.procNames[processName] s.processes.active.mu.Unlock()