1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

renamed channels and publishers functions

This commit is contained in:
postmannen 2024-11-29 13:26:25 +01:00
parent 32565aad06
commit c2031a66e7
4 changed files with 64 additions and 37 deletions

32
TODO.md
View file

@ -1,4 +1,30 @@
TODO:
# TODO
- Create {{ file:<somefilehere> }} to be used within methodArguments
- Add option to send request types with Jetstream.
## file variable in methodArguments
Create {{ file:<somefilehere> }} to be used within methodArguments. The content of the file could be used directly via stdin to a command.
Examples with kubectl:
Use stdin directly.
`{{file:/my.yaml}}|kubectl -f -`, where the last dash makes kubectl use the content from stdin.
Write to a local file on the destination node first, then use that file.
`echo {{file:/my.yaml}}>/home/bt/mydeployments/dep1.yaml && kubectl -f /home/bt/mydeployments/dep1.yaml`,
## Key and ACL updates to use jetstream
## Problem: loosing access to file, connection to the broker
level=DEBUG msg="error: ack receive failed: waiting for 0 seconds before retrying: subject=errorCentral.errorLog: nats: timeout"
level=DEBUG msg="info: preparing to send nats message with subject errorCentral.errorLog, id: 233"
level=DEBUG msg="send attempt:8, max retries: 10, ack timeout: 60, message.ID: 233, method: errorLog, toNode: errorCentral"
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19866]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19868]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19870]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19872]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19874]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19876]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19878]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19880]
error signing nonce: unable to extract key pair from file "/Users/bt/ctrl/tmp/bt1/seed.txt": nats: open /Users/bt/ctrl/tmp/bt1/seed.txt: no such file or directory on connection [19882]
level=DEBUG msg="error: ack receive failed: waiting for 0 seconds before retrying: subject=errorCentral.errorLog: nats: timeout"

View file

@ -116,7 +116,7 @@ func (s *server) readStartupFolder() {
er = fmt.Errorf("%v", string(j))
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
s.samSendLocalCh <- sams
s.messageDeliverLocalCh <- sams
}
@ -220,7 +220,7 @@ func (s *server) jetstreamConsume() {
}
// If a message is received via
s.samSendLocalCh <- []subjectAndMessage{sam}
s.messageDeliverLocalCh <- []subjectAndMessage{sam}
})
defer consumeContext.Stop()

View file

@ -237,7 +237,7 @@ func (p process) startSubscriber() {
}()
}
p.natsSubscription = p.subscribeMessages()
p.natsSubscription = p.startNatsSubscriber()
// We also need to be able to remove all the information about this process
// when the process context is canceled.
@ -264,11 +264,12 @@ var (
ErrACKSubscribeRetry = errors.New("ctrl: retrying to subscribe for ack message")
)
// messageDeliverNats will create the Nats message with headers and payload.
// It will also take care of the delivering the message that is converted to
// gob or cbor format as a nats.Message. It will also take care of checking
// timeouts and retries specified for the message.
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
// publishNats will create the Nats message with headers and payload.
// The payload of the nats message, which is the ctrl message will be
// serialized and compress before put in the data field of the nats
// message.
// It will also take care of resending if not delievered, and timeouts.
func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
retryAttempts := 0
if message.RetryWait <= 0 {
@ -435,7 +436,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// kind of message it is and then it will check how to handle that message type,
// and then call the correct method handler for it.
//
// This handler function should be started in it's own go routine,so
// This function should be started in it's own go routine,so
// one individual handler is started per message received so we can keep
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
@ -692,7 +693,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
// SubscribeMessage will register the Nats callback function for the specified
// nats subject. This allows us to receive Nats messages for a given subject
// on a node.
func (p process) subscribeMessages() *nats.Subscription {
func (p process) startNatsSubscriber() *nats.Subscription {
subject := string(p.subject.name())
// natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
@ -786,7 +787,7 @@ func (p process) publishAMessage(m Message, natsConn *nats.Conn) {
// Create the Nats message with headers and payload, and do the
// sending of the message.
p.messageDeliverNats(b, natsMsgHeader, natsConn, m)
p.publishNats(b, natsMsgHeader, natsConn, m)
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.

View file

@ -51,8 +51,8 @@ type server struct {
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
newMessagesCh chan subjectAndMessage
// directSAMSCh
samSendLocalCh chan []subjectAndMessage
// messageDeliverLocalCh
messageDeliverLocalCh chan []subjectAndMessage
// Channel for messages to publish with Jetstream.
jetstreamPublishCh chan Message
// errorKernel is doing all the error handling like what to do if
@ -228,23 +228,23 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
}()
s := server{
ctx: ctx,
cancel: cancel,
configuration: configuration,
nodeName: configuration.NodeName,
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan subjectAndMessage),
samSendLocalCh: make(chan []subjectAndMessage),
jetstreamPublishCh: make(chan Message),
metrics: metrics,
version: version,
errorKernel: errorKernel,
nodeAuth: nodeAuth,
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
zstdEncoder: zstdEncoder,
ctx: ctx,
cancel: cancel,
configuration: configuration,
nodeName: configuration.NodeName,
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan subjectAndMessage),
messageDeliverLocalCh: make(chan []subjectAndMessage),
jetstreamPublishCh: make(chan Message),
metrics: metrics,
version: version,
errorKernel: errorKernel,
nodeAuth: nodeAuth,
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
zstdEncoder: zstdEncoder,
}
s.processes = newProcesses(ctx, &s)
@ -374,7 +374,7 @@ func (s *server) Start() {
}
// Start the processing of new messages from an input channel.
s.routeMessagesToProcess()
s.routeMessagesToPublisherProcess()
// Start reading the channel for injecting direct messages that should
// not be sent via the message broker.
@ -433,7 +433,7 @@ func (s *server) directSAMSChRead() {
case <-s.ctx.Done():
log.Printf("info: stopped the directSAMSCh reader\n\n")
return
case sams := <-s.samSendLocalCh:
case sams := <-s.messageDeliverLocalCh:
// fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams)
// Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler.
@ -487,7 +487,7 @@ func (s *server) Stop() {
}
// routeMessagesToProcess takes a database name it's input argument.
// routeMessagesToPublisherProcess takes a database name it's input argument.
// The database will be used as the persistent k/v store for the work
// queue which is implemented as a ring buffer.
// The ringBufferInCh are where we get new messages to publish.
@ -496,7 +496,7 @@ func (s *server) Stop() {
// worker process.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
func (s *server) routeMessagesToProcess() {
func (s *server) routeMessagesToPublisherProcess() {
// Start reading new fresh messages received on the incomming message
// pipe/file.