mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
added flags/env for starting up jetstream processes
This commit is contained in:
parent
960a2c3588
commit
ab0b0fbe24
2 changed files with 137 additions and 138 deletions
|
@ -95,44 +95,33 @@ type Configuration struct {
|
||||||
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
|
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
|
||||||
|
|
||||||
StartProcesses StartProcesses
|
StartProcesses StartProcesses
|
||||||
|
// Comma separated list of additional streams to listen on.
|
||||||
Jetstreams string `comment:"Comma separated list of streams to consume messages from"`
|
Jetstreams string
|
||||||
}
|
}
|
||||||
|
|
||||||
type StartProcesses struct {
|
type StartProcesses struct {
|
||||||
// StartPubHello, sets the interval in seconds for how often we send hello messages to central server
|
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
|
||||||
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
|
|
||||||
// Enable the updates of public keys
|
|
||||||
EnableKeyUpdates bool `comment:"Enable the updates of public keys"`
|
EnableKeyUpdates bool `comment:"Enable the updates of public keys"`
|
||||||
|
|
||||||
// Enable the updates of acl's
|
|
||||||
EnableAclUpdates bool `comment:"Enable the updates of acl's"`
|
EnableAclUpdates bool `comment:"Enable the updates of acl's"`
|
||||||
|
|
||||||
// Start the central error logger.
|
IsCentralErrorLogger bool `comment:"Start the central error logger."`
|
||||||
IsCentralErrorLogger bool `comment:"Start the central error logger."`
|
StartSubHello bool `comment:"Start subscriber for hello messages"`
|
||||||
// Start subscriber for hello messages
|
StartSubFileAppend bool `comment:"Start subscriber for text logging"`
|
||||||
StartSubHello bool `comment:"Start subscriber for hello messages"`
|
StartSubFile bool `comment:"Start subscriber for writing to file"`
|
||||||
// Start subscriber for text logging
|
StartSubCopySrc bool `comment:"Start subscriber for reading files to copy"`
|
||||||
StartSubFileAppend bool `comment:"Start subscriber for text logging"`
|
StartSubCopyDst bool `comment:"Start subscriber for writing copied files to disk"`
|
||||||
// Start subscriber for writing to file
|
StartSubCliCommand bool `comment:"Start subscriber for CLICommand"`
|
||||||
StartSubFile bool `comment:"Start subscriber for writing to file"`
|
StartSubConsole bool `comment:"Start subscriber for Console"`
|
||||||
// Start subscriber for reading files to copy
|
StartSubHttpGet bool `comment:"Start subscriber for HttpGet"`
|
||||||
StartSubCopySrc bool `comment:"Start subscriber for reading files to copy"`
|
StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
|
||||||
// Start subscriber for writing copied files to disk
|
StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
|
||||||
StartSubCopyDst bool `comment:"Start subscriber for writing copied files to disk"`
|
StartJetstreamPublisher bool `comment:"Start the nats jetstream publisher"`
|
||||||
// Start subscriber for Echo Request
|
StartJetstreamConsumer bool `comment:"Start the nats jetstream consumer"`
|
||||||
StartSubCliCommand bool `comment:"Start subscriber for CLICommand"`
|
|
||||||
// Start subscriber for Console
|
|
||||||
StartSubConsole bool `comment:"Start subscriber for Console"`
|
|
||||||
// Start subscriber for HttpGet
|
|
||||||
StartSubHttpGet bool `comment:"Start subscriber for HttpGet"`
|
|
||||||
// Start subscriber for tailing log files
|
|
||||||
StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
|
|
||||||
// Start subscriber for continously delivery of output from cli commands.
|
|
||||||
StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
|
|
||||||
|
|
||||||
// IsCentralAuth, enable to make this instance take the role as the central auth server
|
// IsCentralAuth, enable to make this instance take the role as the central auth server
|
||||||
IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
|
IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
|
||||||
|
Jetstreams string `comment: "Comma separated list of additional streams to listen on"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfiguration will return a *Configuration.
|
// NewConfiguration will return a *Configuration.
|
||||||
|
@ -180,6 +169,8 @@ func NewConfiguration() *Configuration {
|
||||||
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
|
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
|
||||||
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
|
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
|
||||||
flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
|
flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
|
||||||
|
flag.BoolVar(&c.StartProcesses.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartProcesses.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher")
|
||||||
|
flag.BoolVar(&c.StartProcesses.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartProcesses.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer")
|
||||||
flag.StringVar(&c.Jetstreams, "jetstreams", CheckEnv("JETSTREAMS", c.Jetstreams).(string), "Comma separated list of Jetstrams to consume")
|
flag.StringVar(&c.Jetstreams, "jetstreams", CheckEnv("JETSTREAMS", c.Jetstreams).(string), "Comma separated list of Jetstrams to consume")
|
||||||
|
|
||||||
// Start of Request publishers/subscribers
|
// Start of Request publishers/subscribers
|
||||||
|
@ -258,21 +249,23 @@ func newConfigurationDefaults() Configuration {
|
||||||
Jetstreams: "",
|
Jetstreams: "",
|
||||||
|
|
||||||
StartProcesses: StartProcesses{
|
StartProcesses: StartProcesses{
|
||||||
StartPubHello: 30,
|
StartPubHello: 30,
|
||||||
EnableKeyUpdates: false,
|
EnableKeyUpdates: false,
|
||||||
EnableAclUpdates: false,
|
EnableAclUpdates: false,
|
||||||
IsCentralErrorLogger: false,
|
IsCentralErrorLogger: false,
|
||||||
StartSubHello: true,
|
StartSubHello: true,
|
||||||
StartSubFileAppend: true,
|
StartSubFileAppend: true,
|
||||||
StartSubFile: true,
|
StartSubFile: true,
|
||||||
StartSubCopySrc: true,
|
StartSubCopySrc: true,
|
||||||
StartSubCopyDst: true,
|
StartSubCopyDst: true,
|
||||||
StartSubCliCommand: true,
|
StartSubCliCommand: true,
|
||||||
StartSubConsole: true,
|
StartSubConsole: true,
|
||||||
StartSubHttpGet: true,
|
StartSubHttpGet: true,
|
||||||
StartSubTailFile: true,
|
StartSubTailFile: true,
|
||||||
StartSubCliCommandCont: true,
|
StartSubCliCommandCont: true,
|
||||||
IsCentralAuth: false,
|
IsCentralAuth: false,
|
||||||
|
StartJetstreamPublisher: true,
|
||||||
|
StartJetstreamConsumer: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
|
|
194
processes.go
194
processes.go
|
@ -343,65 +343,69 @@ func (p *processes) Start(proc process) {
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
// ProcFunc for Jetstream publishers.
|
// ProcFunc for Jetstream publishers.
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
|
if proc.configuration.StartProcesses.StartJetstreamPublisher {
|
||||||
js, err := jetstream.New(proc.natsConn)
|
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
if err != nil {
|
js, err := jetstream.New(proc.natsConn)
|
||||||
log.Fatalf("error: jetstream new failed: %v\n", err)
|
if err != nil {
|
||||||
}
|
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
|
_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
|
||||||
Name: "nodes",
|
Name: "nodes",
|
||||||
Description: "nodes stream",
|
Description: "nodes stream",
|
||||||
Subjects: []string{"nodes.>"},
|
Subjects: []string{"nodes.>"},
|
||||||
// Discard older messages and keep only the last one.
|
// Discard older messages and keep only the last one.
|
||||||
MaxMsgsPerSubject: 1,
|
MaxMsgsPerSubject: 1,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: jetstream create or update failed: %v\n", err)
|
log.Fatalf("error: jetstream create or update failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// REMOVE: Go routine for injecting messages for testing
|
||||||
|
// go func() {
|
||||||
|
// i := 0
|
||||||
|
// for {
|
||||||
|
// m := Message{
|
||||||
|
// ToNode: "btdev1",
|
||||||
|
// FromNode: proc.node,
|
||||||
|
// JetstreamToNode: "btdev1",
|
||||||
|
// Method: CliCommand,
|
||||||
|
// MethodArgs: []string{"/bin/ash", "-c", "tree"},
|
||||||
|
// ReplyMethod: Console,
|
||||||
|
// MethodTimeout: 3,
|
||||||
|
// //Data: []byte("some text in here............"),
|
||||||
|
// }
|
||||||
|
// proc.jetstreamOut <- m
|
||||||
|
//
|
||||||
|
// log.Printf("published message: %v\n", i)
|
||||||
|
// time.Sleep(time.Second * 1)
|
||||||
|
// i++
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }()
|
||||||
|
|
||||||
// REMOVE: Go routine for injecting messages for testing
|
|
||||||
go func() {
|
|
||||||
i := 0
|
|
||||||
for {
|
for {
|
||||||
m := Message{
|
// TODO:
|
||||||
ToNode: "btdev1",
|
select {
|
||||||
FromNode: proc.node,
|
case msg := <-proc.jetstreamOut:
|
||||||
Method: CliCommand,
|
b, err := json.Marshal(msg)
|
||||||
MethodArgs: []string{"/bin/ash", "-c", "tree"},
|
if err != nil {
|
||||||
ReplyMethod: Console,
|
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
|
||||||
MethodTimeout: 3,
|
}
|
||||||
//Data: []byte("some text in here............"),
|
|
||||||
|
subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
|
||||||
|
_, err = js.Publish(proc.ctx, subject, b)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
|
||||||
}
|
}
|
||||||
proc.jetstreamOut <- m
|
|
||||||
|
|
||||||
log.Printf("published message: %v\n", i)
|
|
||||||
time.Sleep(time.Second * 1)
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
// TODO:
|
|
||||||
select {
|
|
||||||
case msgJS := <-proc.jetstreamOut:
|
|
||||||
b, err := json.Marshal(msgJS)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = js.Publish(proc.ctx, "nodes.btdev1", b)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
|
||||||
}
|
}
|
||||||
proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
|
|
||||||
|
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
// Procfunc for Jetstream consumers.
|
// Procfunc for Jetstream consumers.
|
||||||
|
@ -412,58 +416,60 @@ func (p *processes) Start(proc process) {
|
||||||
// After a jetstream message is picked up from the stream, the steward message
|
// After a jetstream message is picked up from the stream, the steward message
|
||||||
// will be extracted from the data field, and the ctrl message will be put
|
// will be extracted from the data field, and the ctrl message will be put
|
||||||
// on the local delivery channel, and handled as a normal ctrl message.
|
// on the local delivery channel, and handled as a normal ctrl message.
|
||||||
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
|
if proc.configuration.StartProcesses.StartJetstreamConsumer {
|
||||||
js, err := jetstream.New(proc.natsConn)
|
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
if err != nil {
|
js, err := jetstream.New(proc.natsConn)
|
||||||
log.Fatalf("error: jetstream new failed: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := js.Stream(proc.ctx, "nodes")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("error: js.Stream failed: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
|
|
||||||
Name: "nodes_processor",
|
|
||||||
Durable: "nodes_processor",
|
|
||||||
FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
|
|
||||||
stewardMessage := Message{}
|
|
||||||
err := json.Unmarshal(msg.Data(), &stewardMessage)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
|
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
|
stream, err := js.Stream(proc.ctx, "nodes")
|
||||||
|
|
||||||
msg.Ack()
|
|
||||||
|
|
||||||
// Messages received here via jetstream are for this node. Put the message into
|
|
||||||
// a SubjectAndMessage structure, and we use the deliver local from here.
|
|
||||||
sam, err := newSubjectAndMessage(stewardMessage)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
|
log.Fatalf("error: js.Stream failed: %v\n", err)
|
||||||
}
|
}
|
||||||
proc.server.samSendLocalCh <- []subjectAndMessage{sam}
|
|
||||||
})
|
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
|
||||||
if err != nil {
|
Name: "nodes_processor",
|
||||||
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
Durable: "nodes_processor",
|
||||||
|
FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
|
||||||
|
stewardMessage := Message{}
|
||||||
|
err := json.Unmarshal(msg.Data(), &stewardMessage)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
|
||||||
|
|
||||||
|
msg.Ack()
|
||||||
|
|
||||||
|
// Messages received here via jetstream are for this node. Put the message into
|
||||||
|
// a SubjectAndMessage structure, and we use the deliver local from here.
|
||||||
|
sam, err := newSubjectAndMessage(stewardMessage)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
|
||||||
|
}
|
||||||
|
proc.server.samSendLocalCh <- []subjectAndMessage{sam}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer cctx.Stop()
|
||||||
|
|
||||||
|
<-proc.ctx.Done()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
defer cctx.Stop()
|
proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
|
||||||
|
|
||||||
<-proc.ctx.Done()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------------------------
|
// --------------------------------------------------
|
||||||
|
|
Loading…
Add table
Reference in a new issue