diff --git a/.gitignore b/.gitignore index bdfeaf8..2a06fc3 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ doc/concept/via/README.md notes.txt toolbox/ signing/ +cmd/ctrl/ctrl diff --git a/process.go b/process.go index 18c8513..3f785ab 100644 --- a/process.go +++ b/process.go @@ -766,9 +766,9 @@ func (p process) publishMessagesNats(natsConn *nats.Conn) { select { case <-ticker.C: if p.isLongRunningPublisher { - er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) + // er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) + // p.errorKernel.logDebug(er) continue } diff --git a/processes.go b/processes.go index dd7ef47..e7c450f 100644 --- a/processes.go +++ b/processes.go @@ -2,9 +2,9 @@ package ctrl import ( "context" + "encoding/json" "fmt" "log" - "strings" "sync" "time" @@ -346,6 +346,7 @@ func (p *processes) Start(proc process) { // -------------------------------------------------- if proc.configuration.StartJetstreamPublisher { pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error { + fmt.Printf("######## DEBUG: Publisher: beginning og jetstream publisher: %v\n", "#######") js, err := jetstream.New(proc.natsConn) if err != nil { log.Fatalf("error: jetstream new failed: %v\n", err) @@ -356,8 +357,9 @@ func (p *processes) Start(proc process) { Description: "nodes stream", Subjects: []string{"nodes.>"}, // Discard older messages and keep only the last one. - MaxMsgsPerSubject: 1, + // MaxMsgsPerSubject: 1, }) + fmt.Printf("######## DEBUG: Publisher: CreateOrUpdateStream: %v\n", "#######") if err != nil { log.Fatalf("error: jetstream create or update failed: %v\n", err) @@ -367,16 +369,24 @@ func (p *processes) Start(proc process) { // TODO: select { case msg := <-proc.jetstreamOut: - b, err := proc.messageSerializeAndCompress(msg) + fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg) + // b, err := proc.messageSerializeAndCompress(msg) + // if err != nil { + // log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err) + // } + b, err := json.Marshal(msg) if err != nil { log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err) } subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode) + fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###") _, err = js.Publish(proc.ctx, subject, b) if err != nil { log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err) } + fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###") + case <-ctx.Done(): return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done") } @@ -396,6 +406,10 @@ func (p *processes) Start(proc process) { // on the local delivery channel, and handled as a normal ctrl message. if proc.configuration.StartJetstreamConsumer { pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error { + fmt.Println("---------------------------------------------------------------") + fmt.Printf("--- DEBUG: consumer: starting up jetstream consumer %v\n", "---") + fmt.Println("---------------------------------------------------------------") + js, err := jetstream.New(proc.natsConn) if err != nil { log.Fatalf("error: jetstream new failed: %v\n", err) @@ -403,23 +417,35 @@ func (p *processes) Start(proc process) { stream, err := js.Stream(proc.ctx, "nodes") if err != nil { - log.Fatalf("error: js.Stream failed: %v\n", err) + log.Printf("error: js.Stream failed: %v\n", err) + } + + // stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{ + // Name: "nodes", + // Description: "nodes stream", + // Subjects: []string{"nodes.>"}, + // // Discard older messages and keep only the last one. + // MaxMsgsPerSubject: 1, + // }) + if err != nil { + log.Printf("error: js.Stream failed: %v\n", err) } // Check for more subjects via flags to listen to, and if defined prefix all // the values with "nodes." filterSubjectValues := []string{ fmt.Sprintf("nodes.%v", proc.server.nodeName), - "nodes.all", + //"nodes.all", } + fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) - // Check if there are more to consume defined in flags/env. - if proc.configuration.JetstreamsConsume != "" { - splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",") - for i, v := range splitValues { - filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) - } - } + //// Check if there are more to consume defined in flags/env. + //if proc.configuration.JetstreamsConsume != "" { + // splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",") + // for i, v := range splitValues { + // filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) + // } + //} consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ Name: "nodes_processor", @@ -430,16 +456,24 @@ func (p *processes) Start(proc process) { log.Fatalf("error: create or update consumer failed: %v\n", err) } + consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues) + fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo) + cctx, err := consumer.Consume(func(msg jetstream.Msg) { + fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg) + msg.Ack() + stewardMessage := Message{} - stewardMessage, err := proc.messageDeserializeAndUncompress(msg) + // stewardMessage, err := proc.messageDeserializeAndUncompress(msg) + // if err != nil { + // log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err) + // } + err := json.Unmarshal(msg.Data(), &stewardMessage) if err != nil { log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err) } - log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method)) - - msg.Ack() + log.Printf("----- Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method)) // Messages received here via jetstream are for this node. Put the message into // a SubjectAndMessage structure, and we use the deliver local from here. @@ -447,7 +481,10 @@ func (p *processes) Start(proc process) { if err != nil { log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err) } + + fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n") proc.server.samSendLocalCh <- []subjectAndMessage{sam} + fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n") }) if err != nil { log.Fatalf("error: create or update consumer failed: %v\n", err) diff --git a/server.go b/server.go index c1e8fce..5a48c22 100644 --- a/server.go +++ b/server.go @@ -217,17 +217,17 @@ func NewServer(configuration *Configuration, version string) (*server, error) { // Prepare the zstd encoder // Prepare the zstd encoder to put into processInitial - var zEnc *zstd.Encoder - // Prepare a zstd encoder so we can reuse the zstd encoder for all messages. zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) if err != nil { log.Fatalf("error: zstd new encoder failed: %v", err) } - go func() { - <-ctx.Done() - zEnc.Close() + defer func() { + go func() { + <-ctx.Done() + zstdEncoder.Close() + }() }() s := server{