1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added nats queue groups subscriber

This commit is contained in:
postmannen 2022-04-07 21:43:00 +02:00
parent eaf164c9d7
commit dc3186285d
3 changed files with 14 additions and 1 deletions

View file

@ -21,6 +21,7 @@ As long as you can do something as an operator on in a shell on a system you can
- [Publishing and Subscribing processes](#publishing-and-subscribing-processes) - [Publishing and Subscribing processes](#publishing-and-subscribing-processes)
- [Publisher](#publisher) - [Publisher](#publisher)
- [Subscriber](#subscriber) - [Subscriber](#subscriber)
- [Load balancing](#load-balancing)
- [Logical structure](#logical-structure) - [Logical structure](#logical-structure)
- [Terminology](#terminology) - [Terminology](#terminology)
- [Features](#features) - [Features](#features)
@ -172,6 +173,10 @@ If one process hangs on a long running message method it will not affect the res
2. When a message have been received, a handler for the method type specified in the message will be executed. 2. When a message have been received, a handler for the method type specified in the message will be executed.
3. If the output of the method called is supposed to be returned to the publiser it will do so by using the replyMethod specified. 3. If the output of the method called is supposed to be returned to the publiser it will do so by using the replyMethod specified.
### Load balancing
Steward instances with the same **Nodename** will automatically load balance the handling of messages on a given subject, and any given message will only be handled once by one instance.
### Logical structure ### Logical structure
TODO: Make a diagram here... TODO: Make a diagram here...

View file

@ -565,7 +565,8 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// on a node. // on a node.
func (p process) subscribeMessages() *nats.Subscription { func (p process) subscribeMessages() *nats.Subscription {
subject := string(p.subject.name()) subject := string(p.subject.name())
natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// Start up the subscriber handler. // Start up the subscriber handler.

View file

@ -2133,6 +2133,13 @@ func (m methodREQPublicKeysPut) handler(proc process, message Message, node stri
fmt.Printf(" *** RECEIVED KEYS: %v\n", keys) fmt.Printf(" *** RECEIVED KEYS: %v\n", keys)
// TODO:
// - We need to store the public keys in a map in signatures.go
// - We should also persist that public keys map to file, so we can read
// that file on startup to get all the previosly received keys.
// - The store to map and also that map to file should happen with a method
// on signatures.publicKeys, so we do both in one go.
// Prepare and queue for sending a new message with the output // Prepare and queue for sending a new message with the output
// of the action executed. // of the action executed.
// newReplyMessage(proc, message, out) // newReplyMessage(proc, message, out)