mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
added debug
This commit is contained in:
parent
b9934a7503
commit
ccac973422
4 changed files with 61 additions and 23 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -17,3 +17,4 @@ doc/concept/via/README.md
|
|||
notes.txt
|
||||
toolbox/
|
||||
signing/
|
||||
cmd/ctrl/ctrl
|
||||
|
|
|
@ -766,9 +766,9 @@ func (p process) publishMessagesNats(natsConn *nats.Conn) {
|
|||
select {
|
||||
case <-ticker.C:
|
||||
if p.isLongRunningPublisher {
|
||||
er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
|
||||
// er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logDebug(er)
|
||||
// p.errorKernel.logDebug(er)
|
||||
|
||||
continue
|
||||
}
|
||||
|
|
69
processes.go
69
processes.go
|
@ -2,9 +2,9 @@ package ctrl
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -346,6 +346,7 @@ func (p *processes) Start(proc process) {
|
|||
// --------------------------------------------------
|
||||
if proc.configuration.StartJetstreamPublisher {
|
||||
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
fmt.Printf("######## DEBUG: Publisher: beginning og jetstream publisher: %v\n", "#######")
|
||||
js, err := jetstream.New(proc.natsConn)
|
||||
if err != nil {
|
||||
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||
|
@ -356,8 +357,9 @@ func (p *processes) Start(proc process) {
|
|||
Description: "nodes stream",
|
||||
Subjects: []string{"nodes.>"},
|
||||
// Discard older messages and keep only the last one.
|
||||
MaxMsgsPerSubject: 1,
|
||||
// MaxMsgsPerSubject: 1,
|
||||
})
|
||||
fmt.Printf("######## DEBUG: Publisher: CreateOrUpdateStream: %v\n", "#######")
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("error: jetstream create or update failed: %v\n", err)
|
||||
|
@ -367,16 +369,24 @@ func (p *processes) Start(proc process) {
|
|||
// TODO:
|
||||
select {
|
||||
case msg := <-proc.jetstreamOut:
|
||||
b, err := proc.messageSerializeAndCompress(msg)
|
||||
fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg)
|
||||
// b, err := proc.messageSerializeAndCompress(msg)
|
||||
// if err != nil {
|
||||
// log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
|
||||
// }
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
|
||||
fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###")
|
||||
_, err = js.Publish(proc.ctx, subject, b)
|
||||
if err != nil {
|
||||
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
|
||||
}
|
||||
fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###")
|
||||
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
|
||||
}
|
||||
|
@ -396,6 +406,10 @@ func (p *processes) Start(proc process) {
|
|||
// on the local delivery channel, and handled as a normal ctrl message.
|
||||
if proc.configuration.StartJetstreamConsumer {
|
||||
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
fmt.Println("---------------------------------------------------------------")
|
||||
fmt.Printf("--- DEBUG: consumer: starting up jetstream consumer %v\n", "---")
|
||||
fmt.Println("---------------------------------------------------------------")
|
||||
|
||||
js, err := jetstream.New(proc.natsConn)
|
||||
if err != nil {
|
||||
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||
|
@ -403,23 +417,35 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
stream, err := js.Stream(proc.ctx, "nodes")
|
||||
if err != nil {
|
||||
log.Fatalf("error: js.Stream failed: %v\n", err)
|
||||
log.Printf("error: js.Stream failed: %v\n", err)
|
||||
}
|
||||
|
||||
// stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
|
||||
// Name: "nodes",
|
||||
// Description: "nodes stream",
|
||||
// Subjects: []string{"nodes.>"},
|
||||
// // Discard older messages and keep only the last one.
|
||||
// MaxMsgsPerSubject: 1,
|
||||
// })
|
||||
if err != nil {
|
||||
log.Printf("error: js.Stream failed: %v\n", err)
|
||||
}
|
||||
|
||||
// Check for more subjects via flags to listen to, and if defined prefix all
|
||||
// the values with "nodes."
|
||||
filterSubjectValues := []string{
|
||||
fmt.Sprintf("nodes.%v", proc.server.nodeName),
|
||||
"nodes.all",
|
||||
//"nodes.all",
|
||||
}
|
||||
fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
|
||||
|
||||
// Check if there are more to consume defined in flags/env.
|
||||
if proc.configuration.JetstreamsConsume != "" {
|
||||
splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
|
||||
for i, v := range splitValues {
|
||||
filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
|
||||
}
|
||||
}
|
||||
//// Check if there are more to consume defined in flags/env.
|
||||
//if proc.configuration.JetstreamsConsume != "" {
|
||||
// splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
|
||||
// for i, v := range splitValues {
|
||||
// filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
|
||||
// }
|
||||
//}
|
||||
|
||||
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
|
||||
Name: "nodes_processor",
|
||||
|
@ -430,16 +456,24 @@ func (p *processes) Start(proc process) {
|
|||
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||
}
|
||||
|
||||
consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
|
||||
fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo)
|
||||
|
||||
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
|
||||
fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg)
|
||||
msg.Ack()
|
||||
|
||||
stewardMessage := Message{}
|
||||
stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
|
||||
// stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
|
||||
// if err != nil {
|
||||
// log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
|
||||
// }
|
||||
err := json.Unmarshal(msg.Data(), &stewardMessage)
|
||||
if err != nil {
|
||||
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
|
||||
}
|
||||
|
||||
log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
|
||||
|
||||
msg.Ack()
|
||||
log.Printf("----- Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
|
||||
|
||||
// Messages received here via jetstream are for this node. Put the message into
|
||||
// a SubjectAndMessage structure, and we use the deliver local from here.
|
||||
|
@ -447,7 +481,10 @@ func (p *processes) Start(proc process) {
|
|||
if err != nil {
|
||||
log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
|
||||
}
|
||||
|
||||
fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
|
||||
proc.server.samSendLocalCh <- []subjectAndMessage{sam}
|
||||
fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||
|
|
10
server.go
10
server.go
|
@ -217,17 +217,17 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
|
||||
// Prepare the zstd encoder
|
||||
// Prepare the zstd encoder to put into processInitial
|
||||
var zEnc *zstd.Encoder
|
||||
// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
|
||||
|
||||
zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||
if err != nil {
|
||||
log.Fatalf("error: zstd new encoder failed: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
zEnc.Close()
|
||||
defer func() {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
zstdEncoder.Close()
|
||||
}()
|
||||
}()
|
||||
|
||||
s := server{
|
||||
|
|
Loading…
Add table
Reference in a new issue