From dc3186285dc15bc99f19dbe1917963386aa5b46b Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 7 Apr 2022 21:43:00 +0200 Subject: [PATCH] added nats queue groups subscriber --- README.md | 5 +++++ process.go | 3 ++- requests.go | 7 +++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1c4f0e5..7da3b40 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ As long as you can do something as an operator on in a shell on a system you can - [Publishing and Subscribing processes](#publishing-and-subscribing-processes) - [Publisher](#publisher) - [Subscriber](#subscriber) + - [Load balancing](#load-balancing) - [Logical structure](#logical-structure) - [Terminology](#terminology) - [Features](#features) @@ -172,6 +173,10 @@ If one process hangs on a long running message method it will not affect the res 2. When a message have been received, a handler for the method type specified in the message will be executed. 3. If the output of the method called is supposed to be returned to the publiser it will do so by using the replyMethod specified. +### Load balancing + +Steward instances with the same **Nodename** will automatically load balance the handling of messages on a given subject, and any given message will only be handled once by one instance. + ### Logical structure TODO: Make a diagram here... diff --git a/process.go b/process.go index 45a0b8f..8635bc9 100644 --- a/process.go +++ b/process.go @@ -565,7 +565,8 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // on a node. func (p process) subscribeMessages() *nats.Subscription { subject := string(p.subject.name()) - natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { + natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) { //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // Start up the subscriber handler. diff --git a/requests.go b/requests.go index 707ff53..591b0c9 100644 --- a/requests.go +++ b/requests.go @@ -2133,6 +2133,13 @@ func (m methodREQPublicKeysPut) handler(proc process, message Message, node stri fmt.Printf(" *** RECEIVED KEYS: %v\n", keys) + // TODO: + // - We need to store the public keys in a map in signatures.go + // - We should also persist that public keys map to file, so we can read + // that file on startup to get all the previosly received keys. + // - The store to map and also that map to file should happen with a method + // on signatures.publicKeys, so we do both in one go. + // Prepare and queue for sending a new message with the output // of the action executed. // newReplyMessage(proc, message, out)