1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
This commit is contained in:
postmannen 2024-11-25 18:03:12 +01:00
parent b821aebc03
commit 9ea823293c
3 changed files with 42 additions and 27 deletions

View file

@ -596,6 +596,11 @@ func (p process) callHandler(message Message, thisNode string) []byte {
// executeHandler will call the handler for the Request type defined in the message.
func executeHandler(p process, message Message, thisNode string) {
var err error
if message.ToNode != "errorCentral" {
fmt.Printf("??????? DEBUG: executeHandler: got message: %v\n", message)
fmt.Printf("??????? DEBUG: executeHandler: got thisNode: %v\n", thisNode)
fmt.Printf("??????? DEBUG: executeHandler: got process: %+v\n", p)
}
// Check if it is a message to run scheduled.
var interval int

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
@ -369,7 +370,7 @@ func (p *processes) Start(proc process) {
// TODO:
select {
case msg := <-proc.jetstreamOut:
fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg)
fmt.Printf("\n######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg)
// b, err := proc.messageSerializeAndCompress(msg)
// if err != nil {
// log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
@ -381,6 +382,7 @@ func (p *processes) Start(proc process) {
subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
fmt.Printf("######## DEBUG: Publisher: before publish on subject: %v\n", subject)
_, err = js.Publish(proc.ctx, subject, b)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
@ -415,42 +417,45 @@ func (p *processes) Start(proc process) {
log.Fatalf("error: jetstream new failed: %v\n", err)
}
stream, err := js.Stream(proc.ctx, "nodes")
if err != nil {
log.Printf("error: js.Stream failed: %v\n", err)
}
// stream, err := js.Stream(proc.ctx, "nodes")
// if err != nil {
// log.Printf("error: js.Stream failed: %v\n", err)
// }
// stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
// Name: "nodes",
// Description: "nodes stream",
// Subjects: []string{"nodes.>"},
// // Discard older messages and keep only the last one.
// MaxMsgsPerSubject: 1,
// })
stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
Name: "nodes",
Description: "nodes stream",
Subjects: []string{"nodes.>"},
// Discard older messages and keep only the last one.
MaxMsgsPerSubject: 10,
})
if err != nil {
log.Printf("error: js.Stream failed: %v\n", err)
}
// Check for more subjects via flags to listen to, and if defined prefix all
// the values with "nodes."
////filterSubjectValues := []string{
//// fmt.Sprintf("nodes.%v", proc.server.nodeName),
//// //"nodes.all",
////}
////fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
fmt.Println("-------------------------------------------------------")
fmt.Printf(" DEBUG: start consumer: proc.server.nodeName: %v\n", proc.server.nodeName)
fmt.Println("-------------------------------------------------------")
filterSubjectValues := []string{
fmt.Sprintf("nodes.%v", proc.server.nodeName),
//"nodes.all",
}
fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
//// Check if there are more to consume defined in flags/env.
//if proc.configuration.JetstreamsConsume != "" {
// splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
// for i, v := range splitValues {
// filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
// }
//}
// Check if there are more to consume defined in flags/env.
if proc.configuration.JetstreamsConsume != "" {
splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
for i, v := range splitValues {
filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
}
}
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
Name: "nodes_processor",
Durable: "nodes_processor",
//FilterSubjects: filterSubjectValues,
Name: "nodes_processor",
Durable: "nodes_processor",
FilterSubjects: filterSubjectValues,
})
if err != nil {
log.Fatalf("error: create or update consumer failed: %v\n", err)

View file

@ -446,6 +446,11 @@ func (s *server) directSAMSChRead() {
for i, sam := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriberNats)
if processName == "" {
fmt.Printf("error: processName was empty, sams[%v] was: %#v\n", i, sams[i])
os.Exit(1)
}
s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName]
s.processes.active.mu.Unlock()