From ccac973422162096a45097f1e23a62225f93da3a Mon Sep 17 00:00:00 2001
From: postmannen <postmannen@gmail.com>
Date: Sun, 24 Nov 2024 19:12:57 +0100
Subject: [PATCH] added debug

---
 .gitignore   |  1 +
 process.go   |  4 +--
 processes.go | 69 ++++++++++++++++++++++++++++++++++++++++------------
 server.go    | 10 ++++----
 4 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/.gitignore b/.gitignore
index bdfeaf8..2a06fc3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ doc/concept/via/README.md
 notes.txt
 toolbox/
 signing/
+cmd/ctrl/ctrl
diff --git a/process.go b/process.go
index 18c8513..3f785ab 100644
--- a/process.go
+++ b/process.go
@@ -766,9 +766,9 @@ func (p process) publishMessagesNats(natsConn *nats.Conn) {
 		select {
 		case <-ticker.C:
 			if p.isLongRunningPublisher {
-				er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
+				// er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
 				//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
-				p.errorKernel.logDebug(er)
+				// p.errorKernel.logDebug(er)
 
 				continue
 			}
diff --git a/processes.go b/processes.go
index dd7ef47..e7c450f 100644
--- a/processes.go
+++ b/processes.go
@@ -2,9 +2,9 @@ package ctrl
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"log"
-	"strings"
 	"sync"
 	"time"
 
@@ -346,6 +346,7 @@ func (p *processes) Start(proc process) {
 	// --------------------------------------------------
 	if proc.configuration.StartJetstreamPublisher {
 		pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
+			fmt.Printf("######## DEBUG: Publisher: beginning og jetstream publisher: %v\n", "#######")
 			js, err := jetstream.New(proc.natsConn)
 			if err != nil {
 				log.Fatalf("error: jetstream new failed: %v\n", err)
@@ -356,8 +357,9 @@ func (p *processes) Start(proc process) {
 				Description: "nodes stream",
 				Subjects:    []string{"nodes.>"},
 				// Discard older messages and keep only the last one.
-				MaxMsgsPerSubject: 1,
+				// MaxMsgsPerSubject: 1,
 			})
+			fmt.Printf("######## DEBUG: Publisher: CreateOrUpdateStream: %v\n", "#######")
 
 			if err != nil {
 				log.Fatalf("error: jetstream create or update failed: %v\n", err)
@@ -367,16 +369,24 @@ func (p *processes) Start(proc process) {
 				// TODO:
 				select {
 				case msg := <-proc.jetstreamOut:
-					b, err := proc.messageSerializeAndCompress(msg)
+					fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg)
+					// b, err := proc.messageSerializeAndCompress(msg)
+					// if err != nil {
+					// 	log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
+					// }
+					b, err := json.Marshal(msg)
 					if err != nil {
 						log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
 					}
 
 					subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
+					fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###")
 					_, err = js.Publish(proc.ctx, subject, b)
 					if err != nil {
 						log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
 					}
+					fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###")
+
 				case <-ctx.Done():
 					return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
 				}
@@ -396,6 +406,10 @@ func (p *processes) Start(proc process) {
 	// on the local delivery channel, and handled as a normal ctrl message.
 	if proc.configuration.StartJetstreamConsumer {
 		pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
+			fmt.Println("---------------------------------------------------------------")
+			fmt.Printf("--- DEBUG: consumer: starting up jetstream consumer %v\n", "---")
+			fmt.Println("---------------------------------------------------------------")
+
 			js, err := jetstream.New(proc.natsConn)
 			if err != nil {
 				log.Fatalf("error: jetstream new failed: %v\n", err)
@@ -403,23 +417,35 @@ func (p *processes) Start(proc process) {
 
 			stream, err := js.Stream(proc.ctx, "nodes")
 			if err != nil {
-				log.Fatalf("error: js.Stream failed: %v\n", err)
+				log.Printf("error: js.Stream failed: %v\n", err)
+			}
+
+			// stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
+			// 	Name:        "nodes",
+			// 	Description: "nodes stream",
+			// 	Subjects:    []string{"nodes.>"},
+			// 	// Discard older messages and keep only the last one.
+			// 	MaxMsgsPerSubject: 1,
+			// })
+			if err != nil {
+				log.Printf("error: js.Stream failed: %v\n", err)
 			}
 
 			// Check for more subjects via flags to listen to, and if defined prefix all
 			// the values with "nodes."
 			filterSubjectValues := []string{
 				fmt.Sprintf("nodes.%v", proc.server.nodeName),
-				"nodes.all",
+				//"nodes.all",
 			}
+			fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
 
-			// Check if there are more to consume defined in flags/env.
-			if proc.configuration.JetstreamsConsume != "" {
-				splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
-				for i, v := range splitValues {
-					filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
-				}
-			}
+			//// Check if there are more to consume defined in flags/env.
+			//if proc.configuration.JetstreamsConsume != "" {
+			//	splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
+			//	for i, v := range splitValues {
+			//		filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
+			//	}
+			//}
 
 			consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
 				Name:           "nodes_processor",
@@ -430,16 +456,24 @@ func (p *processes) Start(proc process) {
 				log.Fatalf("error: create or update consumer failed: %v\n", err)
 			}
 
+			consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
+			fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo)
+
 			cctx, err := consumer.Consume(func(msg jetstream.Msg) {
+				fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg)
+				msg.Ack()
+
 				stewardMessage := Message{}
-				stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
+				// stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
+				// if err != nil {
+				// 	log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
+				// }
+				err := json.Unmarshal(msg.Data(), &stewardMessage)
 				if err != nil {
 					log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
 				}
 
-				log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
-
-				msg.Ack()
+				log.Printf("----- Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
 
 				// Messages received here via jetstream are for this node. Put the message into
 				// a SubjectAndMessage structure, and we use the deliver local from here.
@@ -447,7 +481,10 @@ func (p *processes) Start(proc process) {
 				if err != nil {
 					log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
 				}
+
+				fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
 				proc.server.samSendLocalCh <- []subjectAndMessage{sam}
+				fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
 			})
 			if err != nil {
 				log.Fatalf("error: create or update consumer failed: %v\n", err)
diff --git a/server.go b/server.go
index c1e8fce..5a48c22 100644
--- a/server.go
+++ b/server.go
@@ -217,17 +217,17 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
 
 	// Prepare the zstd encoder
 	// Prepare the zstd encoder to put into processInitial
-	var zEnc *zstd.Encoder
-	// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
 
 	zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
 	if err != nil {
 		log.Fatalf("error: zstd new encoder failed: %v", err)
 	}
 
-	go func() {
-		<-ctx.Done()
-		zEnc.Close()
+	defer func() {
+		go func() {
+			<-ctx.Done()
+			zstdEncoder.Close()
+		}()
 	}()
 
 	s := server{