mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
added initial jetstream field to process struct, and renamed process.spawnWorker to process.start.
This commit is contained in:
parent
d5d1658cb4
commit
7b2d5fc96f
5 changed files with 25 additions and 15 deletions
28
process.go
28
process.go
|
@ -5,12 +5,14 @@ import (
|
|||
"crypto/ed25519"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
// "google.golang.org/protobuf/internal/errors"
|
||||
)
|
||||
|
@ -115,6 +117,8 @@ type process struct {
|
|||
errorKernel *errorKernel
|
||||
// metrics
|
||||
metrics *metrics
|
||||
// jetstream
|
||||
js jetstream.JetStream
|
||||
}
|
||||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
|
@ -130,6 +134,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
|
||||
var method Method
|
||||
|
||||
js, err := jetstream.New(server.natsConn)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to create jetstream.New: %v\n", err)
|
||||
}
|
||||
|
||||
proc := process{
|
||||
server: server,
|
||||
messageID: 0,
|
||||
|
@ -149,6 +158,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
centralAuth: server.centralAuth,
|
||||
errorKernel: server.errorKernel,
|
||||
metrics: server.metrics,
|
||||
js: js,
|
||||
}
|
||||
|
||||
// We use the full name of the subject to identify a unique
|
||||
|
@ -172,7 +182,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
//
|
||||
// It will give the process the next available ID, and also add the
|
||||
// process to the processes map in the server structure.
|
||||
func (p process) spawnWorker() {
|
||||
func (p process) Start() {
|
||||
|
||||
// Add prometheus metrics for the process.
|
||||
if !p.isSubProcess {
|
||||
|
@ -188,7 +198,7 @@ func (p process) spawnWorker() {
|
|||
// Start a subscriber worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
if p.processKind == processKindSubscriber {
|
||||
p.startSubscriber()
|
||||
p.startSubscriberNats()
|
||||
}
|
||||
|
||||
// Add information about the new process to the started processes map.
|
||||
|
@ -221,7 +231,7 @@ func (p process) startPublisher() {
|
|||
go p.publishMessages(p.natsConn)
|
||||
}
|
||||
|
||||
func (p process) startSubscriber() {
|
||||
func (p process) startSubscriberNats() {
|
||||
// If there is a procFunc for the process, start it.
|
||||
if p.procFunc != nil {
|
||||
// Initialize the channel for communication between the proc and
|
||||
|
@ -239,7 +249,7 @@ func (p process) startSubscriber() {
|
|||
}()
|
||||
}
|
||||
|
||||
p.natsSubscription = p.subscribeMessages()
|
||||
p.natsSubscription = p.subscribeMessagesNats()
|
||||
|
||||
// We also need to be able to remove all the information about this process
|
||||
// when the process context is canceled.
|
||||
|
@ -710,17 +720,17 @@ func (p process) verifySigOrAclFlag(message Message) bool {
|
|||
// SubscribeMessage will register the Nats callback function for the specified
|
||||
// nats subject. This allows us to receive Nats messages for a given subject
|
||||
// on a node.
|
||||
func (p process) subscribeMessages() *nats.Subscription {
|
||||
func (p process) subscribeMessagesNats() *nats.Subscription {
|
||||
subject := string(p.subject.name())
|
||||
// natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
|
||||
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
|
||||
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
|
||||
|
||||
// Register the callback function that NATS will use when new messages arrive.
|
||||
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
|
||||
// Start up the subscriber handler.
|
||||
go p.messageSubscriberHandlerNats(p.natsConn, p.configuration.NodeName, msg, subject)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: Subscribe failed: %v", err)
|
||||
er := fmt.Errorf("error: nats queue subscribe failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -385,7 +385,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
|
|||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
|
||||
proc.procFunc = pf
|
||||
|
||||
go proc.spawnWorker()
|
||||
go proc.Start()
|
||||
}
|
||||
|
||||
// publisher will start a publisher process. It takes the initial process, request method,
|
||||
|
@ -398,7 +398,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
|
|||
proc.procFunc = pf
|
||||
proc.isLongRunningPublisher = true
|
||||
|
||||
go proc.spawnWorker()
|
||||
go proc.Start()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
|
|
|
@ -235,7 +235,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
copySrcSubProc.handler = copySrcSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copySrcSubProc.spawnWorker()
|
||||
go copySrcSubProc.Start()
|
||||
|
||||
// Send a message over the the node where the destination file will be written,
|
||||
// to also start up a sub process on the destination node.
|
||||
|
@ -362,7 +362,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
|||
copyDstSubProc.handler = copyDstSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copyDstSubProc.spawnWorker()
|
||||
go copyDstSubProc.Start()
|
||||
|
||||
fp := filepath.Join(cia.DstDir, cia.DstFile)
|
||||
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName)
|
||||
|
|
|
@ -69,7 +69,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
|
|||
// Create the process and start it.
|
||||
sub := newSubject(method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
|
||||
go procNew.spawnWorker()
|
||||
go procNew.Start()
|
||||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf("%v", txt)
|
||||
|
|
|
@ -577,7 +577,7 @@ func (s *server) routeMessagesToProcess() {
|
|||
proc = newProcess(s.ctx, s, sub, processKindPublisher)
|
||||
}
|
||||
|
||||
proc.spawnWorker()
|
||||
proc.Start()
|
||||
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
||||
s.errorKernel.logDebug(er)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue