1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00
This commit is contained in:
postmannen 2024-11-24 19:19:47 +01:00
parent 6c615591a6
commit 25295c4095
14 changed files with 502 additions and 378 deletions

1
.gitignore vendored
View file

@ -17,3 +17,4 @@ doc/concept/via/README.md
notes.txt notes.txt
toolbox/ toolbox/
signing/ signing/
cmd/ctrl/ctrl

View file

@ -76,10 +76,6 @@ type Configuration struct {
ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"` ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"`
// Retries for error messages // Retries for error messages
ErrorMessageRetries int `comment:"Retries for error messages"` ErrorMessageRetries int `comment:"Retries for error messages"`
// Compression z for zstd or g for gzip
Compression string `comment:"Compression z for zstd or g for gzip"`
// Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor
Serialization string `comment:"Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor"`
// SetBlockProfileRate for block profiling // SetBlockProfileRate for block profiling
SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"` SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"`
// EnableSocket for enabling the creation of a ctrl.sock file // EnableSocket for enabling the creation of a ctrl.sock file
@ -88,9 +84,6 @@ type Configuration struct {
EnableSignatureCheck bool `comment:"EnableSignatureCheck to enable signature checking"` EnableSignatureCheck bool `comment:"EnableSignatureCheck to enable signature checking"`
// EnableAclCheck to enable ACL checking // EnableAclCheck to enable ACL checking
EnableAclCheck bool `comment:"EnableAclCheck to enable ACL checking"` EnableAclCheck bool `comment:"EnableAclCheck to enable ACL checking"`
// EnableDebug will also enable printing all the messages received in the errorKernel to STDERR.
EnableDebug bool `comment:"EnableDebug will also enable printing all the messages received in the errorKernel to STDERR."`
// LogLevel // LogLevel
LogLevel string `comment:"LogLevel error/info/warning/debug/none."` LogLevel string `comment:"LogLevel error/info/warning/debug/none."`
LogConsoleTimestamps bool `comment:"LogConsoleTimestamps true/false for enabling or disabling timestamps when printing errors and information to stderr"` LogConsoleTimestamps bool `comment:"LogConsoleTimestamps true/false for enabling or disabling timestamps when printing errors and information to stderr"`
@ -101,40 +94,27 @@ type Configuration struct {
// it have not received any messages for the given amount of time. // it have not received any messages for the given amount of time.
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."` KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
StartProcesses StartProcesses // Comma separated list of additional streams to consume from.
} JetstreamsConsume string
type StartProcesses struct { StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
// StartPubHello, sets the interval in seconds for how often we send hello messages to central server
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
// Enable the updates of public keys
EnableKeyUpdates bool `comment:"Enable the updates of public keys"` EnableKeyUpdates bool `comment:"Enable the updates of public keys"`
// Enable the updates of acl's
EnableAclUpdates bool `comment:"Enable the updates of acl's"` EnableAclUpdates bool `comment:"Enable the updates of acl's"`
// Start the central error logger. IsCentralErrorLogger bool `comment:"Start the central error logger"`
IsCentralErrorLogger bool `comment:"Start the central error logger."` StartSubHello bool `comment:"Start subscriber for hello messages"`
// Start subscriber for hello messages StartSubFileAppend bool `comment:"Start subscriber for text logging"`
StartSubHello bool `comment:"Start subscriber for hello messages"` StartSubFile bool `comment:"Start subscriber for writing to file"`
// Start subscriber for text logging StartSubCopySrc bool `comment:"Start subscriber for reading files to copy"`
StartSubFileAppend bool `comment:"Start subscriber for text logging"` StartSubCopyDst bool `comment:"Start subscriber for writing copied files to disk"`
// Start subscriber for writing to file StartSubCliCommand bool `comment:"Start subscriber for CLICommand"`
StartSubFile bool `comment:"Start subscriber for writing to file"` StartSubConsole bool `comment:"Start subscriber for Console"`
// Start subscriber for reading files to copy StartSubHttpGet bool `comment:"Start subscriber for HttpGet"`
StartSubCopySrc bool `comment:"Start subscriber for reading files to copy"` StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
// Start subscriber for writing copied files to disk StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
StartSubCopyDst bool `comment:"Start subscriber for writing copied files to disk"` StartJetstreamPublisher bool `comment:"Start the nats jetstream publisher"`
// Start subscriber for Echo Request StartJetstreamConsumer bool `comment:"Start the nats jetstream consumer"`
StartSubCliCommand bool `comment:"Start subscriber for CLICommand"`
// Start subscriber for Console
StartSubConsole bool `comment:"Start subscriber for Console"`
// Start subscriber for HttpGet
StartSubHttpGet bool `comment:"Start subscriber for HttpGet"`
// Start subscriber for tailing log files
StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
// Start subscriber for continously delivery of output from cli commands.
StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
// IsCentralAuth, enable to make this instance take the role as the central auth server // IsCentralAuth, enable to make this instance take the role as the central auth server
IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"` IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
@ -177,43 +157,46 @@ func NewConfiguration() *Configuration {
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out") flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out")
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it")
flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression")
flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file") flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.") flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.") flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.")
flag.BoolVar(&c.StartProcesses.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.StartProcesses.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server")
flag.BoolVar(&c.EnableDebug, "enableDebug", CheckEnv("ENABLE_DEBUG", c.EnableDebug).(bool), "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR")
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none") flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr") flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish") flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from")
// Start of Request publishers/subscribers // Start of Request publishers/subscribers
flag.IntVar(&c.StartProcesses.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartProcesses.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds") flag.IntVar(&c.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds")
flag.BoolVar(&c.StartProcesses.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.StartProcesses.EnableKeyUpdates).(bool), "true/false") flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.EnableKeyUpdates).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.StartProcesses.EnableAclUpdates).(bool), "true/false") flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.EnableAclUpdates).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.StartProcesses.IsCentralErrorLogger).(bool), "true/false") flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.IsCentralErrorLogger).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartProcesses.StartSubHello).(bool), "true/false") flag.BoolVar(&c.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartSubHello).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartProcesses.StartSubFileAppend).(bool), "true/false") flag.BoolVar(&c.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartSubFileAppend).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartProcesses.StartSubFile).(bool), "true/false") flag.BoolVar(&c.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartSubFile).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartProcesses.StartSubCopySrc).(bool), "true/false") flag.BoolVar(&c.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartSubCopySrc).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartProcesses.StartSubCopyDst).(bool), "true/false") flag.BoolVar(&c.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartSubCopyDst).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartProcesses.StartSubCliCommand).(bool), "true/false") flag.BoolVar(&c.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartSubCliCommand).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartProcesses.StartSubConsole).(bool), "true/false") flag.BoolVar(&c.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartSubConsole).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartProcesses.StartSubHttpGet).(bool), "true/false") flag.BoolVar(&c.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartSubHttpGet).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartProcesses.StartSubTailFile).(bool), "true/false") flag.BoolVar(&c.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartSubTailFile).(bool), "true/false")
flag.BoolVar(&c.StartProcesses.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartProcesses.StartSubCliCommandCont).(bool), "true/false") flag.BoolVar(&c.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartSubCliCommandCont).(bool), "true/false")
flag.BoolVar(&c.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher")
flag.BoolVar(&c.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer")
// Check that mandatory flag values have been set. // Check that mandatory flag values have been set.
switch { switch {
case c.NodeName == "": case c.NodeName == "":
log.Fatalf("error: the nodeName config option or flag cannot be empty, check -help\n") log.Fatalf("error: the nodeName config option or flag cannot be empty, check -help\n")
case c.CentralNodeName == "": case c.CentralNodeName == "":
// TODO: Check out if we should drop to have this as a mandatory flag?
log.Fatalf("error: the centralNodeName config option or flag cannot be empty, check -help\n") log.Fatalf("error: the centralNodeName config option or flag cannot be empty, check -help\n")
} }
@ -254,34 +237,32 @@ func newConfigurationDefaults() Configuration {
ExposeDataFolder: "", ExposeDataFolder: "",
ErrorMessageTimeout: 60, ErrorMessageTimeout: 60,
ErrorMessageRetries: 10, ErrorMessageRetries: 10,
Compression: "z",
Serialization: "cbor",
SetBlockProfileRate: 0, SetBlockProfileRate: 0,
EnableSocket: true, EnableSocket: true,
EnableSignatureCheck: false, EnableSignatureCheck: false,
EnableAclCheck: false, EnableAclCheck: false,
EnableDebug: false,
LogLevel: "debug", LogLevel: "debug",
LogConsoleTimestamps: false, LogConsoleTimestamps: false,
KeepPublishersAliveFor: 10, KeepPublishersAliveFor: 10,
JetstreamsConsume: "",
StartProcesses: StartProcesses{ StartPubHello: 30,
StartPubHello: 30, EnableKeyUpdates: false,
EnableKeyUpdates: false, EnableAclUpdates: false,
EnableAclUpdates: false, IsCentralErrorLogger: false,
IsCentralErrorLogger: false, StartSubHello: true,
StartSubHello: true, StartSubFileAppend: true,
StartSubFileAppend: true, StartSubFile: true,
StartSubFile: true, StartSubCopySrc: true,
StartSubCopySrc: true, StartSubCopyDst: true,
StartSubCopyDst: true, StartSubCliCommand: true,
StartSubCliCommand: true, StartSubConsole: true,
StartSubConsole: true, StartSubHttpGet: true,
StartSubHttpGet: true, StartSubTailFile: true,
StartSubTailFile: true, StartSubCliCommandCont: true,
StartSubCliCommandCont: true, IsCentralAuth: false,
IsCentralAuth: false, StartJetstreamPublisher: true,
}, StartJetstreamConsumer: true,
} }
return c return c
} }

12
go.mod
View file

@ -10,14 +10,14 @@ require (
github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail v1.0.0
github.com/jinzhu/copier v0.4.0 github.com/jinzhu/copier v0.4.0
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/klauspost/compress v1.17.0 github.com/klauspost/compress v1.17.11
github.com/nats-io/nats-server/v2 v2.8.4 github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.25.0 github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nkeys v0.4.7
github.com/pkg/profile v1.7.0 github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_golang v1.14.0
go.etcd.io/bbolt v1.3.7 go.etcd.io/bbolt v1.3.7
golang.org/x/crypto v0.7.0 golang.org/x/crypto v0.29.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/exp v0.0.0-20230321023759-10a507213a29
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -39,8 +39,8 @@ require (
github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sys v0.6.0 // indirect golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.8.0 // indirect golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/protobuf v1.30.0 // indirect google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect

12
go.sum
View file

@ -45,6 +45,8 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
@ -65,9 +67,13 @@ github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBri
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@ -102,6 +108,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@ -116,6 +124,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
@ -124,6 +134,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -84,6 +84,9 @@ type Message struct {
PreviousMessage *Message PreviousMessage *Message
// Schedule // Schedule
Schedule []int `json:"schedule" yaml:"schedule"` Schedule []int `json:"schedule" yaml:"schedule"`
// Is to be used with the stream subject to tell what nodes
// the the message is for.
JetstreamToNode string `json:"jetstreamToNode" yaml:"jetstreamToNode"`
} }
// --- Subject // --- Subject
@ -149,3 +152,16 @@ type subjectName string
func (s Subject) name() subjectName { func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s", s.ToNode, s.Method)) return subjectName(fmt.Sprintf("%s.%s", s.ToNode, s.Method))
} }
type streamInfo struct {
name string
subjects []string
}
func newStreamInfo(name string, subjects []string) streamInfo {
s := streamInfo{
name: name,
subjects: subjects,
}
return s
}

68
process-jetstream.go Normal file
View file

@ -0,0 +1,68 @@
package ctrl
import (
"fmt"
"github.com/fxamacker/cbor/v2"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go/jetstream"
)
// messageDeserializeAndUncompress will deserialize the ctrl message
func (p *process) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) {
// Variable to hold a copy of the message data.
msgData := msg.Data()
// If debugging is enabled, print the source node name of the nats messages received.
headerFromNode := msg.Headers().Get("fromNode")
if headerFromNode != "" {
er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
p.errorKernel.logDebug(er)
}
zr, err := zstd.NewReader(nil)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err)
return Message{}, er
}
msgData, err = zr.DecodeAll(msgData, nil)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err)
zr.Close()
return Message{}, er
}
zr.Close()
message := Message{}
err = cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err)
return Message{}, er
}
return message, nil
}
// messageSerializeAndCompress will serialize and compress the Message, and
// return the result as a []byte.
func (p *process) messageSerializeAndCompress(msg Message) ([]byte, error) {
// encode the message structure into cbor
bSerialized, err := cbor.Marshal(msg)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.errorKernel.logDebug(er)
return nil, er
}
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
bCompressed := p.server.zstdEncoder.EncodeAll(bSerialized, nil)
return bCompressed, nil
}

View file

@ -1,21 +1,17 @@
package ctrl package ctrl
import ( import (
"bytes"
"compress/gzip"
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"encoding/gob"
"errors" "errors"
"fmt" "fmt"
"io" "log"
"os"
"sync"
"time" "time"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
// "google.golang.org/protobuf/internal/errors" // "google.golang.org/protobuf/internal/errors"
) )
@ -26,8 +22,10 @@ import (
type processKind string type processKind string
const ( const (
processKindSubscriber processKind = "subscriber" processKindSubscriberNats processKind = "subscriberNats"
processKindPublisher processKind = "publisher" processKindPublisherNats processKind = "publisherNats"
processKindConsumerJetstream processKind = "consumerJetstream"
processKindPublisherJetstream processKind = "publisherJetstream"
) )
// process holds all the logic to handle a message type and it's // process holds all the logic to handle a message type and it's
@ -45,8 +43,10 @@ type process struct {
messageID int messageID int
// the subject used for the specific process. One process // the subject used for the specific process. One process
// can contain only one sender on a message bus, hence // can contain only one sender on a message bus, hence
// also one subject // also one subject.
subject Subject subject Subject
// The jetstram stream.
streamInfo streamInfo
// Put a node here to be able know the node a process is at. // Put a node here to be able know the node a process is at.
node Node node Node
// The processID for the current process // The processID for the current process
@ -89,6 +89,8 @@ type process struct {
configuration *Configuration configuration *Configuration
// The new messages channel copied from *Server // The new messages channel copied from *Server
newMessagesCh chan<- []subjectAndMessage newMessagesCh chan<- []subjectAndMessage
// JetstreamOut channel
jetstreamOut chan Message
// The structure who holds all processes information // The structure who holds all processes information
processes *processes processes *processes
// nats connection // nats connection
@ -120,11 +122,15 @@ type process struct {
errorKernel *errorKernel errorKernel *errorKernel
// metrics // metrics
metrics *metrics metrics *metrics
// jetstream
js jetstream.JetStream
// zstd encoder
zstdEncoder *zstd.Encoder
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { func newProcess(ctx context.Context, server *server, subject Subject, stream streamInfo, processKind processKind) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
server.processes.mu.Lock() server.processes.mu.Lock()
server.processes.lastProcessID++ server.processes.lastProcessID++
@ -135,6 +141,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
var method Method var method Method
js, err := jetstream.New(server.natsConn)
if err != nil {
log.Fatalf("error: failed to create jetstream.New: %v\n", err)
}
proc := process{ proc := process{
server: server, server: server,
messageID: 0, messageID: 0,
@ -144,6 +155,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
processKind: processKind, processKind: processKind,
methodsAvailable: method.GetMethodsAvailable(), methodsAvailable: method.GetMethodsAvailable(),
newMessagesCh: server.newMessagesCh, newMessagesCh: server.newMessagesCh,
jetstreamOut: server.jetstreamOutCh,
configuration: server.configuration, configuration: server.configuration,
processes: server.processes, processes: server.processes,
natsConn: server.natsConn, natsConn: server.natsConn,
@ -154,30 +166,32 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
centralAuth: server.centralAuth, centralAuth: server.centralAuth,
errorKernel: server.errorKernel, errorKernel: server.errorKernel,
metrics: server.metrics, metrics: server.metrics,
js: js,
zstdEncoder: server.zstdEncoder,
} }
// We use the full name of the subject to identify a unique // We use the name of the subject to identify a unique process.
// process. We can do that since a process can only handle
// one message queue.
if proc.processKind == processKindPublisher { switch proc.processKind {
proc.processName = processNameGet(proc.subject.name(), processKindPublisher) case processKindPublisherNats:
} proc.processName = processNameGet(proc.subject.name(), processKindPublisherNats)
if proc.processKind == processKindSubscriber { case processKindSubscriberNats:
proc.processName = processNameGet(proc.subject.name(), processKindSubscriber) proc.processName = processNameGet(proc.subject.name(), processKindSubscriberNats)
case processKindConsumerJetstream:
proc.processName = processNameGet(subjectName(proc.streamInfo.name), processKindConsumerJetstream)
case processKindPublisherJetstream:
proc.processName = processNameGet(subjectName(proc.streamInfo.name), processKindPublisherJetstream)
} }
return proc return proc
} }
// The purpose of this function is to check if we should start a // Start a publisher or subscriber process, where a process is a go routine
// publisher or subscriber process, where a process is a go routine // that will handle either sending or receiving messages on one subject.
// that will handle either sending or receiving messages on one
// subject.
// //
// It will give the process the next available ID, and also add the // It will give the process the next available ID, and also add the
// process to the processes map in the server structure. // process to the processes map in the server structure.
func (p process) spawnWorker() { func (p process) Start() {
// Add prometheus metrics for the process. // Add prometheus metrics for the process.
if !p.isSubProcess { if !p.isSubProcess {
@ -186,14 +200,14 @@ func (p process) spawnWorker() {
// Start a publisher worker, which will start a go routine (process) // Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
if p.processKind == processKindPublisher { if p.processKind == processKindPublisherNats {
p.startPublisher() p.startPublisherNats()
} }
// Start a subscriber worker, which will start a go routine (process) // Start a subscriber worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
if p.processKind == processKindSubscriber { if p.processKind == processKindSubscriberNats {
p.startSubscriber() p.startSubscriberNats()
} }
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
@ -205,7 +219,7 @@ func (p process) spawnWorker() {
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)
} }
func (p process) startPublisher() { func (p process) startPublisherNats() {
// If there is a procFunc for the process, start it. // If there is a procFunc for the process, start it.
if p.procFunc != nil { if p.procFunc != nil {
// Initialize the channel for communication between the proc and // Initialize the channel for communication between the proc and
@ -223,10 +237,10 @@ func (p process) startPublisher() {
}() }()
} }
go p.publishMessages(p.natsConn) go p.publishMessagesNats(p.natsConn)
} }
func (p process) startSubscriber() { func (p process) startSubscriberNats() {
// If there is a procFunc for the process, start it. // If there is a procFunc for the process, start it.
if p.procFunc != nil { if p.procFunc != nil {
// Initialize the channel for communication between the proc and // Initialize the channel for communication between the proc and
@ -244,7 +258,7 @@ func (p process) startSubscriber() {
}() }()
} }
p.natsSubscription = p.subscribeMessages() p.natsSubscription = p.subscribeMessagesNats()
// We also need to be able to remove all the information about this process // We also need to be able to remove all the information about this process
// when the process context is canceled. // when the process context is canceled.
@ -447,102 +461,42 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// the state of the message being processed, and then reply back to the // the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct // correct sending process's reply, meaning so we ACK back to the correct
// publisher. // publisher.
func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) { func (p process) messageSubscriberHandlerNats(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) {
// Variable to hold a copy of the message data, so we don't mess with // Variable to hold a copy of the message data, so we don't mess with
// the original data since the original is a pointer value. // the original data since the original is a pointer value.
msgData := make([]byte, len(msg.Data)) msgData := make([]byte, len(msg.Data))
copy(msgData, msg.Data) copy(msgData, msg.Data)
// fmt.Printf(" * DEBUG: header value on subscriberHandler: %v\n", msg.Header)
// If debugging is enabled, print the source node name of the nats messages received. // If debugging is enabled, print the source node name of the nats messages received.
if val, ok := msg.Header["fromNode"]; ok { if val, ok := msg.Header["fromNode"]; ok {
er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject) er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject)
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)
} }
// If compression is used, decompress it to get the gob data. If zr, err := zstd.NewReader(nil)
// compression is not used it is the gob encoded data we already if err != nil {
// got in msgData so we do nothing with it. er := fmt.Errorf("error: zstd NewReader failed: %v", err)
if val, ok := msg.Header["cmp"]; ok { p.errorKernel.errSend(p, Message{}, er, logWarning)
switch val[0] { return
case "z":
zr, err := zstd.NewReader(nil)
if err != nil {
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
return
}
msgData, err = zr.DecodeAll(msg.Data, nil)
if err != nil {
er := fmt.Errorf("error: zstd decoding failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
zr.Close()
return
}
zr.Close()
case "g":
r := bytes.NewReader(msgData)
gr, err := gzip.NewReader(r)
if err != nil {
er := fmt.Errorf("error: gzip NewReader failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logError)
return
}
b, err := io.ReadAll(gr)
if err != nil {
er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
return
}
gr.Close()
msgData = b
}
} }
msgData, err = zr.DecodeAll(msg.Data, nil)
if err != nil {
er := fmt.Errorf("error: zstd decoding failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
zr.Close()
return
}
zr.Close()
message := Message{} message := Message{}
// Check if serialization is specified. err = cbor.Unmarshal(msgData, &message)
// Will default to gob serialization if nothing or non existing value is specified. if err != nil {
if val, ok := msg.Header["serial"]; ok { er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) p.errorKernel.errSend(p, message, er, logError)
switch val[0] { return
case "cbor":
err := cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
default: // Deaults to gob if no match was found.
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
}
} else {
// Default to gob if serialization flag was not specified.
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
} }
// Check if it is an ACK or NACK message, and do the appropriate action accordingly. // Check if it is an ACK or NACK message, and do the appropriate action accordingly.
@ -775,17 +729,17 @@ func (p process) verifySigOrAclFlag(message Message) bool {
// SubscribeMessage will register the Nats callback function for the specified // SubscribeMessage will register the Nats callback function for the specified
// nats subject. This allows us to receive Nats messages for a given subject // nats subject. This allows us to receive Nats messages for a given subject
// on a node. // on a node.
func (p process) subscribeMessages() *nats.Subscription { func (p process) subscribeMessagesNats() *nats.Subscription {
subject := string(p.subject.name()) subject := string(p.subject.name())
// natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// Register the callback function that NATS will use when new messages arrive.
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
// Start up the subscriber handler. // Start up the subscriber handler.
go p.messageSubscriberHandler(p.natsConn, p.configuration.NodeName, msg, subject) go p.messageSubscriberHandlerNats(p.natsConn, p.configuration.NodeName, msg, subject)
}) })
if err != nil { if err != nil {
er := fmt.Errorf("error: Subscribe failed: %v", err) er := fmt.Errorf("error: nats queue subscribe failed: %v", err)
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)
return nil return nil
} }
@ -796,26 +750,7 @@ func (p process) subscribeMessages() *nats.Subscription {
// publishMessages will do the publishing of messages for one single // publishMessages will do the publishing of messages for one single
// process. The function should be run as a goroutine, and will run // process. The function should be run as a goroutine, and will run
// as long as the process it belongs to is running. // as long as the process it belongs to is running.
func (p process) publishMessages(natsConn *nats.Conn) { func (p process) publishMessagesNats(natsConn *nats.Conn) {
var once sync.Once
var zEnc *zstd.Encoder
// Prepare a zstd encoder if enabled. By enabling it here before
// looping over the messages to send below, we can reuse the zstd
// encoder for all messages.
switch p.configuration.Compression {
case "z": // zstd
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
p.errorKernel.logError(er)
os.Exit(1)
}
zEnc = enc
defer zEnc.Close()
}
// Adding a timer that will be used for when to remove the sub process // Adding a timer that will be used for when to remove the sub process
// publisher. The timer is reset each time a message is published with // publisher. The timer is reset each time a message is published with
@ -831,9 +766,9 @@ func (p process) publishMessages(natsConn *nats.Conn) {
select { select {
case <-ticker.C: case <-ticker.C:
if p.isLongRunningPublisher { if p.isLongRunningPublisher {
er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) // er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
p.errorKernel.logDebug(er) // p.errorKernel.logDebug(er)
continue continue
} }
@ -859,7 +794,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
m.ArgSignature = p.addMethodArgSignature(m) m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
go p.publishAMessage(m, zEnc, &once, natsConn) go p.publishAMessageNats(m, natsConn)
case <-p.ctx.Done(): case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.processName) er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
@ -876,107 +811,24 @@ func (p process) addMethodArgSignature(m Message) []byte {
return sign return sign
} }
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) { func (p process) publishAMessageNats(m Message, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the // Create the initial header, and set values below depending on the
// various configuration options chosen. // various configuration options chosen.
natsMsgHeader := make(nats.Header) natsMsgHeader := make(nats.Header)
natsMsgHeader["fromNode"] = []string{string(p.node)} natsMsgHeader["fromNode"] = []string{string(p.node)}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into gob binary format before putting
// it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop
switch p.configuration.Serialization {
case "cbor":
b, err := cbor.Marshal(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.errorKernel.logDebug(er)
return
}
natsMsgPayloadSerialized = b
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
default:
var bufGob bytes.Buffer
gobEnc := gob.NewEncoder(&bufGob)
err := gobEnc.Encode(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
p.errorKernel.logDebug(er)
return
}
natsMsgPayloadSerialized = bufGob.Bytes()
natsMsgHeader["serial"] = []string{"gob"}
}
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name(), processKindPublisherNats)
// The compressed value of the nats message payload. The content serCmp, err := p.messageSerializeAndCompress(m)
// can either be compressed or in it's original form depening on if err != nil {
// the outcome of the switch below, and if compression were chosen log.Fatalf("messageSerializeAndCompress: error: %v\n", err)
// or not.
var natsMsgPayloadCompressed []byte
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
switch p.configuration.Compression {
case "z": // zstd
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
// p.zEncMutex.Lock()
// zEnc.Reset(nil)
// p.zEncMutex.Unlock()
case "g": // gzip
var buf bytes.Buffer
func() {
gzipW := gzip.NewWriter(&buf)
defer gzipW.Close()
defer gzipW.Flush()
_, err := gzipW.Write(natsMsgPayloadSerialized)
if err != nil {
er := fmt.Errorf("error: failed to write gzip: %v", err)
p.errorKernel.logDebug(er)
return
}
}()
natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
default: // no compression
// Allways log the error to console.
er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression")
p.errorKernel.logDebug(er)
// We only wan't to send the error message to errorCentral once.
once.Do(func() {
p.errorKernel.logDebug(er)
})
// No compression, so we just assign the value of the serialized
// data directly to the variable used with messageDeliverNats.
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
} }
// Create the Nats message with headers and payload, and do the // Create the Nats message with headers and payload, and do the
// sending of the message. // sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) p.messageDeliverNats(serCmp, natsMsgHeader, natsConn, m)
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
p.messageID++ p.messageID++
@ -986,34 +838,4 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once,
p.processes.active.procNames[pn] = p p.processes.active.procNames[pn] = p
p.processes.active.mu.Unlock() p.processes.active.mu.Unlock()
} }
// // Handle the error.
// //
// // NOTE: None of the processes above generate an error, so the the
// // if clause will never be triggered. But keeping it here as an example
// // for now for how to handle errors.
// if err != nil {
// // Create an error type which also creates a channel which the
// // errorKernel will send back the action about what to do.
// ep := errorEvent{
// //errorType: logOnly,
// process: p,
// message: m,
// errorActionCh: make(chan errorAction),
// }
// p.errorCh <- ep
//
// // Wait for the response action back from the error kernel, and
// // decide what to do. Should we continue, quit, or .... ?
// switch <-ep.errorActionCh {
// case errActionContinue:
// // Just log and continue
// log.Printf("The errAction was continue...so we're continuing\n")
// case errActionKill:
// log.Printf("The errAction was kill...so we're killing\n")
// // ....
// default:
// log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n")
// }
// }
} }

View file

@ -2,11 +2,13 @@ package ctrl
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"log" "log"
"sync" "sync"
"time" "time"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -97,23 +99,23 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, OpProcessStop, nil) proc.startup.subscriber(proc, OpProcessStop, nil)
proc.startup.subscriber(proc, Test, nil) proc.startup.subscriber(proc, Test, nil)
if proc.configuration.StartProcesses.StartSubFileAppend { if proc.configuration.StartSubFileAppend {
proc.startup.subscriber(proc, FileAppend, nil) proc.startup.subscriber(proc, FileAppend, nil)
} }
if proc.configuration.StartProcesses.StartSubFile { if proc.configuration.StartSubFile {
proc.startup.subscriber(proc, File, nil) proc.startup.subscriber(proc, File, nil)
} }
if proc.configuration.StartProcesses.StartSubCopySrc { if proc.configuration.StartSubCopySrc {
proc.startup.subscriber(proc, CopySrc, nil) proc.startup.subscriber(proc, CopySrc, nil)
} }
if proc.configuration.StartProcesses.StartSubCopyDst { if proc.configuration.StartSubCopyDst {
proc.startup.subscriber(proc, CopyDst, nil) proc.startup.subscriber(proc, CopyDst, nil)
} }
if proc.configuration.StartProcesses.StartSubHello { if proc.configuration.StartSubHello {
// subREQHello is the handler that is triggered when we are receiving a hello // subREQHello is the handler that is triggered when we are receiving a hello
// message. To keep the state of all the hello's received from nodes we need // message. To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process, // to also start a procFunc that will live as a go routine tied to this process,
@ -152,21 +154,22 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, Hello, pf) proc.startup.subscriber(proc, Hello, pf)
} }
if proc.configuration.StartProcesses.IsCentralErrorLogger { fmt.Printf("--------------------------------IsCentralErrorLogger = %v------------------------------\n", proc.configuration.IsCentralErrorLogger)
if proc.configuration.IsCentralErrorLogger {
proc.startup.subscriber(proc, ErrorLog, nil) proc.startup.subscriber(proc, ErrorLog, nil)
} }
if proc.configuration.StartProcesses.StartSubCliCommand { if proc.configuration.StartSubCliCommand {
proc.startup.subscriber(proc, CliCommand, nil) proc.startup.subscriber(proc, CliCommand, nil)
} }
if proc.configuration.StartProcesses.StartSubConsole { if proc.configuration.StartSubConsole {
proc.startup.subscriber(proc, Console, nil) proc.startup.subscriber(proc, Console, nil)
} }
if proc.configuration.StartProcesses.StartPubHello != 0 { if proc.configuration.StartPubHello != 0 {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello)) ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubHello))
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -206,7 +209,7 @@ func (p *processes) Start(proc process) {
proc.startup.publisher(proc, Hello, pf) proc.startup.publisher(proc, Hello, pf)
} }
if proc.configuration.StartProcesses.EnableKeyUpdates { if proc.configuration.EnableKeyUpdates {
// Define the startup of a publisher that will send KeysRequestUpdate // Define the startup of a publisher that will send KeysRequestUpdate
// to central server and ask for publics keys, and to get them deliver back with a request // to central server and ask for publics keys, and to get them deliver back with a request
// of type KeysDeliverUpdate. // of type KeysDeliverUpdate.
@ -257,7 +260,7 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, KeysDeliverUpdate, nil) proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
} }
if proc.configuration.StartProcesses.EnableAclUpdates { if proc.configuration.EnableAclUpdates {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval)) ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
defer ticker.Stop() defer ticker.Stop()
@ -306,7 +309,7 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, AclDeliverUpdate, nil) proc.startup.subscriber(proc, AclDeliverUpdate, nil)
} }
if proc.configuration.StartProcesses.IsCentralAuth { if proc.configuration.IsCentralAuth {
proc.startup.subscriber(proc, KeysRequestUpdate, nil) proc.startup.subscriber(proc, KeysRequestUpdate, nil)
proc.startup.subscriber(proc, KeysAllow, nil) proc.startup.subscriber(proc, KeysAllow, nil)
proc.startup.subscriber(proc, KeysDelete, nil) proc.startup.subscriber(proc, KeysDelete, nil)
@ -324,21 +327,183 @@ func (p *processes) Start(proc process) {
proc.startup.subscriber(proc, AclImport, nil) proc.startup.subscriber(proc, AclImport, nil)
} }
if proc.configuration.StartProcesses.StartSubHttpGet { if proc.configuration.StartSubHttpGet {
proc.startup.subscriber(proc, HttpGet, nil) proc.startup.subscriber(proc, HttpGet, nil)
} }
if proc.configuration.StartProcesses.StartSubTailFile { if proc.configuration.StartSubTailFile {
proc.startup.subscriber(proc, TailFile, nil) proc.startup.subscriber(proc, TailFile, nil)
} }
if proc.configuration.StartProcesses.StartSubCliCommandCont { if proc.configuration.StartSubCliCommandCont {
proc.startup.subscriber(proc, CliCommandCont, nil) proc.startup.subscriber(proc, CliCommandCont, nil)
} }
proc.startup.subscriber(proc, PublicKey, nil) proc.startup.subscriber(proc, PublicKey, nil)
// --------------------------------------------------
// ProcFunc for Jetstream publishers.
// --------------------------------------------------
if proc.configuration.StartJetstreamPublisher {
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("######## DEBUG: Publisher: beginning og jetstream publisher: %v\n", "#######")
js, err := jetstream.New(proc.natsConn)
if err != nil {
log.Fatalf("error: jetstream new failed: %v\n", err)
}
_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
Name: "nodes",
Description: "nodes stream",
Subjects: []string{"nodes.>"},
// Discard older messages and keep only the last one.
// MaxMsgsPerSubject: 1,
})
fmt.Printf("######## DEBUG: Publisher: CreateOrUpdateStream: %v\n", "#######")
if err != nil {
log.Fatalf("error: jetstream create or update failed: %v\n", err)
}
for {
// TODO:
select {
case msg := <-proc.jetstreamOut:
fmt.Printf("######## DEBUG: Publisher: received on <-proc.jetstreamOut: %v\n", msg)
// b, err := proc.messageSerializeAndCompress(msg)
// if err != nil {
// log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
// }
b, err := json.Marshal(msg)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
}
subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
fmt.Printf("######## DEBUG: Publisher: before publish: %v\n", "###")
_, err = js.Publish(proc.ctx, subject, b)
if err != nil {
log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
}
fmt.Printf("######## DEBUG: Publisher: after publish: %v\n", "###")
case <-ctx.Done():
return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
}
}
}
proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
}
// --------------------------------------------------
// Procfunc for Jetstream consumers.
// --------------------------------------------------
// pfJetstreamConsumers connect to a given nats jetstream, and consume messages
// for the node on specified subjects within that stream.
// After a jetstream message is picked up from the stream, the steward message
// will be extracted from the data field, and the ctrl message will be put
// on the local delivery channel, and handled as a normal ctrl message.
if proc.configuration.StartJetstreamConsumer {
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Println("---------------------------------------------------------------")
fmt.Printf("--- DEBUG: consumer: starting up jetstream consumer %v\n", "---")
fmt.Println("---------------------------------------------------------------")
js, err := jetstream.New(proc.natsConn)
if err != nil {
log.Fatalf("error: jetstream new failed: %v\n", err)
}
stream, err := js.Stream(proc.ctx, "nodes")
if err != nil {
log.Printf("error: js.Stream failed: %v\n", err)
}
// stream, err := js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
// Name: "nodes",
// Description: "nodes stream",
// Subjects: []string{"nodes.>"},
// // Discard older messages and keep only the last one.
// MaxMsgsPerSubject: 1,
// })
if err != nil {
log.Printf("error: js.Stream failed: %v\n", err)
}
// Check for more subjects via flags to listen to, and if defined prefix all
// the values with "nodes."
filterSubjectValues := []string{
fmt.Sprintf("nodes.%v", proc.server.nodeName),
//"nodes.all",
}
fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
//// Check if there are more to consume defined in flags/env.
//if proc.configuration.JetstreamsConsume != "" {
// splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
// for i, v := range splitValues {
// filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
// }
//}
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
Name: "nodes_processor",
Durable: "nodes_processor",
FilterSubjects: filterSubjectValues,
})
if err != nil {
log.Fatalf("error: create or update consumer failed: %v\n", err)
}
consumerInfo, _ := fmt.Printf("--- DEBUG: consumer: filterSubjectValues: %v\n", filterSubjectValues)
fmt.Printf("--- DEBUG: consumer: created consumer: %v\n", consumerInfo)
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
fmt.Printf("--- DEBUG: consumer: got jetstream msg to consume: %v\n", msg)
msg.Ack()
stewardMessage := Message{}
// stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
// if err != nil {
// log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
// }
err := json.Unmarshal(msg.Data(), &stewardMessage)
if err != nil {
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
}
log.Printf("----- Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
// Messages received here via jetstream are for this node. Put the message into
// a SubjectAndMessage structure, and we use the deliver local from here.
sam, err := newSubjectAndMessage(stewardMessage)
if err != nil {
log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
}
fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
proc.server.samSendLocalCh <- []subjectAndMessage{sam}
fmt.Print("--- DEBUG : consumer: befor putting on samSendLocalCh\n")
})
if err != nil {
log.Fatalf("error: create or update consumer failed: %v\n", err)
}
defer cctx.Stop()
<-proc.ctx.Done()
return nil
}
proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
}
} }
// --------------------------------------------------
// Stop all subscriber processes. // Stop all subscriber processes.
func (p *processes) Stop() { func (p *processes) Stop() {
log.Printf("info: canceling all subscriber processes...\n") log.Printf("info: canceling all subscriber processes...\n")
@ -348,6 +513,8 @@ func (p *processes) Stop() {
} }
// ---------------------------------------------------------------------------------------
// Helper functions, and other
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
// Startup holds all the startup methods for subscribers. // Startup holds all the startup methods for subscribers.
@ -382,10 +549,10 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
} }
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) proc := newProcess(p.ctx, p.processes.server, sub, streamInfo{}, processKindSubscriberNats)
proc.procFunc = pf proc.procFunc = pf
go proc.spawnWorker() go proc.Start()
} }
// publisher will start a publisher process. It takes the initial process, request method, // publisher will start a publisher process. It takes the initial process, request method,
@ -394,11 +561,11 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
er := fmt.Errorf("starting %v publisher: %#v", m, p.node) er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)
sub := newSubject(m, string(p.node)) sub := newSubject(m, string(p.node))
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) proc := newProcess(p.ctx, p.processes.server, sub, streamInfo{}, processKindPublisherNats)
proc.procFunc = pf proc.procFunc = pf
proc.isLongRunningPublisher = true proc.isLongRunningPublisher = true
go proc.spawnWorker() go proc.Start()
} }
// --------------------------------------------------------------- // ---------------------------------------------------------------

15
request-jetstream.go Normal file
View file

@ -0,0 +1,15 @@
package ctrl
// jetstreamsConsumers will start up the netstream consumers.
// The consumer logic are put in the procFunc.
func jetstreamsConsumers(proc process, message Message, node string) ([]byte, error) {
return []byte{}, nil
}
// jetstreamPublishers will start up the netstream publishers.
// The publisher logic are put in the procFunc.
func jetstreamPublishers(proc process, message Message, node string) ([]byte, error) {
return []byte{}, nil
}

View file

@ -159,6 +159,10 @@ const (
AclExport = "aclExport" AclExport = "aclExport"
// REQAclImport // REQAclImport
AclImport = "aclImport" AclImport = "aclImport"
// Jetstreams Consumers
JetstreamConsumers = "jetstreamConsumers"
// JetstreamPublishers
JetStreamPublishers = "jetstreamPublishers"
) )
type HandlerFunc func(proc process, message Message, node string) ([]byte, error) type HandlerFunc func(proc process, message Message, node string) ([]byte, error)
@ -212,6 +216,8 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
AclExport: HandlerFunc(methodAclExport), AclExport: HandlerFunc(methodAclExport),
AclImport: HandlerFunc(methodAclImport), AclImport: HandlerFunc(methodAclImport),
Test: HandlerFunc(methodTest), Test: HandlerFunc(methodTest),
JetstreamConsumers: HandlerFunc(jetstreamsConsumers),
JetStreamPublishers: HandlerFunc(jetstreamPublishers),
}, },
} }

View file

@ -225,7 +225,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
@ -235,7 +235,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
copySrcSubProc.handler = copySrcSubHandler() copySrcSubProc.handler = copySrcSubHandler()
// The process will be killed when the context expires. // The process will be killed when the context expires.
go copySrcSubProc.spawnWorker() go copySrcSubProc.Start()
// Send a message over the the node where the destination file will be written, // Send a message over the the node where the destination file will be written,
// to also start up a sub process on the destination node. // to also start up a sub process on the destination node.
@ -281,7 +281,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. // newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
p := newProcess(ctx, server, subject, processKind) p := newProcess(ctx, server, subject, streamInfo{}, processKind)
p.isSubProcess = true p.isSubProcess = true
return p return p
@ -333,7 +333,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
// previous message is then fully up and running, so we just discard // previous message is then fully up and running, so we just discard
// that second message in those cases. // that second message in those cases.
pn := processNameGet(sub.name(), processKindSubscriber) pn := processNameGet(sub.name(), processKindSubscriberNats)
// fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn)
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
@ -352,7 +352,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
} }
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
@ -362,7 +362,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
copyDstSubProc.handler = copyDstSubHandler() copyDstSubProc.handler = copyDstSubHandler()
// The process will be killed when the context expires. // The process will be killed when the context expires.
go copyDstSubProc.spawnWorker() go copyDstSubProc.Start()
fp := filepath.Join(cia.DstDir, cia.DstFile) fp := filepath.Join(cia.DstDir, cia.DstFile)
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName) replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName)

View file

@ -68,8 +68,8 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
// Create the process and start it. // Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName) sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) procNew := newProcess(proc.ctx, proc.server, sub, streamInfo{}, processKindSubscriberNats)
go procNew.spawnWorker() go procNew.Start()
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
er := fmt.Errorf("%v", txt) er := fmt.Errorf("%v", txt)

View file

@ -76,9 +76,8 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co
conf.SocketFolder = testFolder conf.SocketFolder = testFolder
conf.SubscribersDataFolder = testFolder conf.SubscribersDataFolder = testFolder
conf.DatabaseFolder = testFolder conf.DatabaseFolder = testFolder
conf.StartProcesses.IsCentralErrorLogger = true conf.IsCentralErrorLogger = true
conf.StartProcesses.IsCentralAuth = true conf.IsCentralAuth = true
conf.EnableDebug = false
conf.LogLevel = "none" conf.LogLevel = "none"
ctrlServer, err := NewServer(&conf, "test") ctrlServer, err := NewServer(&conf, "test")

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -49,6 +50,8 @@ type server struct {
// In general the ringbuffer will read this // In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer. // channel, unfold each slice, and put single messages on the buffer.
newMessagesCh chan []subjectAndMessage newMessagesCh chan []subjectAndMessage
// jetstreamOutCh
jetstreamOutCh chan Message
// directSAMSCh // directSAMSCh
samSendLocalCh chan []subjectAndMessage samSendLocalCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if // errorKernel is doing all the error handling like what to do if
@ -73,6 +76,8 @@ type server struct {
messageID messageID messageID messageID
// audit logging // audit logging
auditLogCh chan []subjectAndMessage auditLogCh chan []subjectAndMessage
// zstd encoder
zstdEncoder *zstd.Encoder
} }
type messageID struct { type messageID struct {
@ -210,6 +215,21 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
centralAuth := newCentralAuth(configuration, errorKernel) centralAuth := newCentralAuth(configuration, errorKernel)
//} //}
// Prepare the zstd encoder
// Prepare the zstd encoder to put into processInitial
zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
log.Fatalf("error: zstd new encoder failed: %v", err)
}
defer func() {
go func() {
<-ctx.Done()
zstdEncoder.Close()
}()
}()
s := server{ s := server{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -218,6 +238,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
natsConn: conn, natsConn: conn,
ctrlSocket: ctrlSocket, ctrlSocket: ctrlSocket,
newMessagesCh: make(chan []subjectAndMessage), newMessagesCh: make(chan []subjectAndMessage),
jetstreamOutCh: make(chan Message),
samSendLocalCh: make(chan []subjectAndMessage), samSendLocalCh: make(chan []subjectAndMessage),
metrics: metrics, metrics: metrics,
version: version, version: version,
@ -226,6 +247,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
helloRegister: newHelloRegister(), helloRegister: newHelloRegister(),
centralAuth: centralAuth, centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage), auditLogCh: make(chan []subjectAndMessage),
zstdEncoder: zstdEncoder,
} }
s.processes = newProcesses(ctx, &s) s.processes = newProcesses(ctx, &s)
@ -337,7 +359,7 @@ func (s *server) Start() {
// //
// The context of the initial process are set in processes.Start. // The context of the initial process are set in processes.Start.
sub := newSubject(Initial, s.nodeName) sub := newSubject(Initial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s, sub, "") s.processInitial = newProcess(context.TODO(), s, sub, streamInfo{}, "")
// Start all wanted subscriber processes. // Start all wanted subscriber processes.
s.processes.Start(s.processInitial) s.processes.Start(s.processInitial)
@ -364,6 +386,13 @@ func (s *server) Start() {
// startAuditLog will start up the logging of all messages to audit file // startAuditLog will start up the logging of all messages to audit file
func (s *server) startAuditLog(ctx context.Context) { func (s *server) startAuditLog(ctx context.Context) {
// Check if database folder exists, if not create it
if _, err := os.Stat(s.configuration.DatabaseFolder); os.IsNotExist(err) {
err := os.MkdirAll(s.configuration.DatabaseFolder, 0770)
if err != nil {
log.Fatalf("error: failed to create socket folder directory %v: %v", s.configuration.SocketFolder, err)
}
}
storeFile := filepath.Join(s.configuration.DatabaseFolder, "store.log") storeFile := filepath.Join(s.configuration.DatabaseFolder, "store.log")
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0660) f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0660)
@ -384,14 +413,14 @@ func (s *server) startAuditLog(ctx context.Context) {
js, err := json.Marshal(msgForPermStore) js, err := json.Marshal(msgForPermStore)
if err != nil { if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) er := fmt.Errorf("error: startAuditLog: fillBuffer: json marshaling: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError) s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
} }
d := time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n" d := time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
_, err = f.WriteString(d) _, err = f.WriteString(d)
if err != nil { if err != nil {
log.Printf("error:failed to write entry: %v\n", err) log.Printf("error: startAuditLog:failed to write entry: %v\n", err)
} }
} }
case <-ctx.Done(): case <-ctx.Done():
@ -415,7 +444,7 @@ func (s *server) directSAMSChRead() {
// Range over all the sams, find the process, check if the method exists, and // Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler. // handle the message by starting the correct method handler.
for i := range sams { for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) processName := processNameGet(sams[i].Subject.name(), processKindSubscriberNats)
s.processes.active.mu.Lock() s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName] p := s.processes.active.procNames[processName]
@ -488,6 +517,14 @@ func (s *server) routeMessagesToProcess() {
for samSlice := range s.newMessagesCh { for samSlice := range s.newMessagesCh {
for _, sam := range samSlice { for _, sam := range samSlice {
// If the message have the JetstreamToNode field specified
// deliver it via the jet stream processes, and abort trying
// to send it via the normal nats publisher.
if sam.Message.JetstreamToNode != "" {
s.jetstreamOutCh <- sam.Message
continue
}
go func(sam subjectAndMessage) { go func(sam subjectAndMessage) {
s.messageID.mu.Lock() s.messageID.mu.Lock()
s.messageID.id++ s.messageID.id++
@ -516,7 +553,7 @@ func (s *server) routeMessagesToProcess() {
m := sam.Message m := sam.Message
subjName := sam.Subject.name() subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher) pn := processNameGet(subjName, processKindPublisherNats)
sendOK := func() bool { sendOK := func() bool {
var ctxCanceled bool var ctxCanceled bool
@ -572,12 +609,12 @@ func (s *server) routeMessagesToProcess() {
var proc process var proc process
switch { switch {
case m.IsSubPublishedMsg: case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher) proc = newSubProcess(s.ctx, s, sub, processKindPublisherNats)
default: default:
proc = newProcess(s.ctx, s, sub, processKindPublisher) proc = newProcess(s.ctx, s, sub, streamInfo{}, processKindPublisherNats)
} }
proc.spawnWorker() proc.Start()
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logDebug(er) s.errorKernel.logDebug(er)