1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 13:49:29 +00:00

initial poc of jetstream seems to be working. Rerouting messages with the jetstream field specified to the jetstream channel

This commit is contained in:
postmannen 2024-11-22 11:31:07 +01:00
parent 3a6a11ac71
commit 960a2c3588
4 changed files with 86 additions and 12 deletions

View file

@ -84,6 +84,9 @@ type Message struct {
PreviousMessage *Message
// Schedule
Schedule []int `json:"schedule" yaml:"schedule"`
// Is to be used with the stream subject to tell what nodes
// the the message is for.
JetstreamToNode string
}
// --- Subject

View file

@ -90,6 +90,8 @@ type process struct {
configuration *Configuration
// The new messages channel copied from *Server
newMessagesCh chan<- []subjectAndMessage
// JetstreamOut channel
jetstreamOut chan Message
// The structure who holds all processes information
processes *processes
// nats connection
@ -152,6 +154,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, stream str
processKind: processKind,
methodsAvailable: method.GetMethodsAvailable(),
newMessagesCh: server.newMessagesCh,
jetstreamOut: server.jetstreamOutCh,
configuration: server.configuration,
processes: server.processes,
natsConn: server.natsConn,

View file

@ -2,6 +2,7 @@ package ctrl
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
@ -339,7 +340,9 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, PublicKey, nil)
// --------------------------------------------------
// ProcFunc for Jetstream publishers.
// --------------------------------------------------
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
js, err := jetstream.New(proc.natsConn)
if err != nil {
@ -358,22 +361,57 @@ func (p *processes) Start(proc process) {
log.Fatalf("error: jetstream create or update failed: %v\n", err)
}
// REMOVE: Go routine for injecting messages for testing
go func() {
i := 0
for {
// TODO:
_, err := js.Publish(proc.ctx, "nodes.btdev1", []byte(fmt.Sprintf("message nr. %v", i)))
if err != nil {
log.Fatalf("error: js failed to publish message: %v\n", err)
m := Message{
ToNode: "btdev1",
FromNode: proc.node,
Method: CliCommand,
MethodArgs: []string{"/bin/ash", "-c", "tree"},
ReplyMethod: Console,
MethodTimeout: 3,
//Data: []byte("some text in here............"),
}
proc.jetstreamOut <- m
log.Printf("published message: %v\n", i)
time.Sleep(time.Second * 1)
i++
}
}()
for {
// TODO:
select {
case msgJS := <-proc.jetstreamOut:
b, err := json.Marshal(msgJS)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
}
_, err = js.Publish(proc.ctx, "nodes.btdev1", b)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
}
case <-ctx.Done():
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
}
}
}
proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
// --------------------------------------------------
// Procfunc for Jetstream consumers.
// --------------------------------------------------
// pfJetstreamConsumers connect to a given nats jetstream, and consume messages
// for the node on specified subjects within that stream.
// After a jetstream message is picked up from the stream, the steward message
// will be extracted from the data field, and the ctrl message will be put
// on the local delivery channel, and handled as a normal ctrl message.
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
js, err := jetstream.New(proc.natsConn)
if err != nil {
@ -386,8 +424,8 @@ func (p *processes) Start(proc process) {
}
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
Name: "order_processor",
Durable: "order_processor",
Name: "nodes_processor",
Durable: "nodes_processor",
FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"},
})
if err != nil {
@ -395,8 +433,23 @@ func (p *processes) Start(proc process) {
}
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
log.Printf("Received message: %v, with data: %v\n", string(msg.Subject()), string(msg.Data()))
stewardMessage := Message{}
err := json.Unmarshal(msg.Data(), &stewardMessage)
if err != nil {
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
}
log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
msg.Ack()
// Messages received here via jetstream are for this node. Put the message into
// a SubjectAndMessage structure, and we use the deliver local from here.
sam, err := newSubjectAndMessage(stewardMessage)
if err != nil {
log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
}
proc.server.samSendLocalCh <- []subjectAndMessage{sam}
})
if err != nil {
log.Fatalf("error: create or update consumer failed: %v\n", err)
@ -413,6 +466,8 @@ func (p *processes) Start(proc process) {
}
// --------------------------------------------------
// Stop all subscriber processes.
func (p *processes) Stop() {
log.Printf("info: canceling all subscriber processes...\n")
@ -422,6 +477,8 @@ func (p *processes) Stop() {
}
// ---------------------------------------------------------------------------------------
// Helper functions, and other
// ---------------------------------------------------------------------------------------
// Startup holds all the startup methods for subscribers.

View file

@ -49,6 +49,8 @@ type server struct {
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
newMessagesCh chan []subjectAndMessage
// jetstreamOutCh
jetstreamOutCh chan Message
// directSAMSCh
samSendLocalCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if
@ -218,6 +220,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan []subjectAndMessage),
jetstreamOutCh: make(chan Message),
samSendLocalCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
@ -488,6 +491,14 @@ func (s *server) routeMessagesToProcess() {
for samSlice := range s.newMessagesCh {
for _, sam := range samSlice {
// If the message have the JetstreamToNode field specified
// deliver it via the jet stream processes, and abort trying
// to send it via the normal nats publisher.
if sam.Message.JetstreamToNode != "" {
s.jetstreamOutCh <- sam.Message
continue
}
go func(sam subjectAndMessage) {
s.messageID.mu.Lock()
s.messageID.id++