1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

debugging print, and removed subject filter from consumer

This commit is contained in:
postmannen 2024-11-24 22:04:08 +01:00
parent ccac973422
commit 425553db57
3 changed files with 28 additions and 13 deletions

View file

@ -267,10 +267,17 @@ func (s *server) readFolder() {
}
fh.Close()
if len(b) == 0 {
return
}
fmt.Printf("!!!!!!!DEBUG: readfolder: 1: readbytes: %v\n", b)
b = bytes.Trim(b, "\x00")
// unmarshal the JSON into a struct
fmt.Printf("!!!!!!!DEBUG: readfolder: 2: readbytes: %v\n", b)
sams, err := s.convertBytesToSAMs(b)
fmt.Printf("!!!!!!!DEBUG: readfolder: done convertBytesToSAMs, got sams: %v\n", sams)
if err != nil {
er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
@ -289,12 +296,16 @@ func (s *server) readFolder() {
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
}
er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.newMessagesCh: %#v", sams)
s.errorKernel.logDebug(er)
// Send the SAM struct to be picked up by the ring buffer.
fmt.Print("!!!!!!!DEBUG: readfolder: Before putting on newMessagesCh\n")
s.newMessagesCh <- sams
fmt.Print("!!!!!!!DEBUG: readfolder: After putting on newMessagesCh\n")
fmt.Print("!!!!!!!DEBUG: readfolder: Before putting on auditLogCh\n")
s.auditLogCh <- sams
fmt.Print("!!!!!!!DEBUG: readfolder: After putting on auditLogCh\n")
// Delete the file.
err = os.Remove(event.Name)

View file

@ -380,12 +380,12 @@ func (p *processes) Start(proc process) {
}
subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###")
fmt.Printf("######## DEBUG: Publisher: before publish on subject: %v\n", subject)
_, err = js.Publish(proc.ctx, subject, b)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
}
fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###")
fmt.Printf("######## DEBUG: Publisher: after publish on subject: %v\n", subject)
case <-ctx.Done():
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
@ -433,11 +433,11 @@ func (p *processes) Start(proc process) {
// Check for more subjects via flags to listen to, and if defined prefix all
// the values with "nodes."
filterSubjectValues := []string{
fmt.Sprintf("nodes.%v", proc.server.nodeName),
//"nodes.all",
}
fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
////filterSubjectValues := []string{
//// fmt.Sprintf("nodes.%v", proc.server.nodeName),
//// //"nodes.all",
////}
////fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
//// Check if there are more to consume defined in flags/env.
//if proc.configuration.JetstreamsConsume != "" {
@ -448,16 +448,17 @@ func (p *processes) Start(proc process) {
//}
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
Name: "nodes_processor",
Durable: "nodes_processor",
FilterSubjects: filterSubjectValues,
Name: "nodes_processor",
Durable: "nodes_processor",
//FilterSubjects: filterSubjectValues,
})
if err != nil {
log.Fatalf("error: create or update consumer failed: %v\n", err)
}
consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo)
consumerInfo, _ := consumer.Info(proc.ctx)
fmt.Printf("--- DEBUG: consumer: consumerInfo: %v\n", consumerInfo)
//fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo)
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg)

View file

@ -520,8 +520,11 @@ func (s *server) routeMessagesToProcess() {
// If the message have the JetstreamToNode field specified
// deliver it via the jet stream processes, and abort trying
// to send it via the normal nats publisher.
fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess: checking if it is a jetstram message: %v\n", sam.Message.JetstreamToNode)
if sam.Message.JetstreamToNode != "" {
fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess: it was a jetstram message, putting it on jetstreamOutCh: %v\n", sam.Message.JetstreamToNode)
s.jetstreamOutCh <- sam.Message
fmt.Printf("$$$$$ DEBUG: routeMessagesToProcess:done putting it on jetstreamOutCh: %v\n", sam.Message.JetstreamToNode)
continue
}