mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
Squashed commit of the following:
implemented reading and embedding the content of a local file with {{CTRL_FILE}} fixed error with not found filename in copy test, remove debug logs seedfile: removed deletion, and changed file permissions to 600 created flags for profiling renamed startup.subscriber to startup.startProcess created a separate method for helloPublisher removed processKind, og removed not needed file check in copy request removed sams from channels removed publisher channel on subject, and messages to publish are now directly published from the newMessagesCh removed no longer needed compression and serialization flags. all messaging are using zstd for compression, and cbor for serializing added functions for handling cbor serializing and zstd compression, and swapped out json marshaling of jetstream message data with cbor and zstd added flag for max jetstream messages to keep on broker per subject
This commit is contained in:
parent
cdf660aa07
commit
3a31ced938
19 changed files with 444 additions and 867 deletions
5
TODO.md
5
TODO.md
|
@ -1,4 +1,3 @@
|
|||
TODO:
|
||||
# TODO
|
||||
|
||||
- Create {{ file:<somefilehere> }} to be used within methodArguments
|
||||
- Add option to send request types with Jetstream.
|
||||
## Key and ACL updates to use jetstream
|
||||
|
|
|
@ -21,20 +21,25 @@ import (
|
|||
var version string
|
||||
|
||||
func main() {
|
||||
defer profile.Start(profile.BlockProfile).Stop()
|
||||
//defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
|
||||
//defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
|
||||
//defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop()
|
||||
|
||||
c := ctrl.NewConfiguration()
|
||||
// err := c.CheckFlags(version)
|
||||
// if err != nil {
|
||||
// log.Printf("%v\n", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
// Start profiling if profiling port is specified
|
||||
if c.ProfilingPort != "" {
|
||||
|
||||
switch c.Profiling {
|
||||
case "block":
|
||||
defer profile.Start(profile.BlockProfile).Stop()
|
||||
case "cpu":
|
||||
defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
|
||||
case "trace":
|
||||
defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
|
||||
case "mem":
|
||||
defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop()
|
||||
default:
|
||||
log.Fatalf("error: profiling port defined, but no valid profiling type defined. Check --help. Got: %v\n", c.Profiling)
|
||||
}
|
||||
|
||||
go func() {
|
||||
http.ListenAndServe("localhost:"+c.ProfilingPort, nil)
|
||||
}()
|
||||
|
|
|
@ -46,12 +46,16 @@ type Configuration struct {
|
|||
KeysUpdateInterval int `comment:"KeysUpdateInterval in seconds"`
|
||||
// AclUpdateInterval in seconds
|
||||
AclUpdateInterval int `comment:"AclUpdateInterval in seconds"`
|
||||
// The type of profiling
|
||||
Profiling string
|
||||
// The number of the profiling port
|
||||
ProfilingPort string `comment:"The number of the profiling port"`
|
||||
// Host and port for prometheus listener, e.g. localhost:2112
|
||||
PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"`
|
||||
// Comma separated list of additional streams to consume from.
|
||||
JetstreamsConsume string
|
||||
JetstreamsConsume string `comment:"a comma separated list of other jetstream subjects to consume"`
|
||||
// Jetstream MaxMsgsPerSubject
|
||||
JetStreamMaxMsgsPerSubject int `comment:"max messages to keep on the broker for a jetstream subject"`
|
||||
// Set to true if this is the node that should receive the error log's from other nodes
|
||||
DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"`
|
||||
// Default value for how long can a request method max be allowed to run in seconds
|
||||
|
@ -78,10 +82,6 @@ type Configuration struct {
|
|||
ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"`
|
||||
// Retries for error messages
|
||||
ErrorMessageRetries int `comment:"Retries for error messages"`
|
||||
// Compression z for zstd or g for gzip
|
||||
Compression string `comment:"Compression z for zstd or g for gzip"`
|
||||
// Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor
|
||||
Serialization string `comment:"Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor"`
|
||||
// SetBlockProfileRate for block profiling
|
||||
SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"`
|
||||
// EnableSocket for enabling the creation of a ctrl.sock file
|
||||
|
@ -165,9 +165,11 @@ func NewConfiguration() *Configuration {
|
|||
flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", CheckEnv("NATS_RECONNECT_JITTER_TLS", c.NatsReconnectJitterTLS).(int), "default nats ReconnectJitterTLS interval in seconds.")
|
||||
flag.IntVar(&c.KeysUpdateInterval, "keysUpdateInterval", CheckEnv("KEYS_UPDATE_INTERVAL", c.KeysUpdateInterval).(int), "default interval in seconds for asking the central for public keys")
|
||||
flag.IntVar(&c.AclUpdateInterval, "aclUpdateInterval", CheckEnv("ACL_UPDATE_INTERVAL", c.AclUpdateInterval).(int), "default interval in seconds for asking the central for acl updates")
|
||||
flag.StringVar(&c.Profiling, "profiling", CheckEnv("PROFILING", c.Profiling).(string), "type of profiling: cpu/block/trace/mem/heap")
|
||||
flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port")
|
||||
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112")
|
||||
flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from")
|
||||
flag.IntVar(&c.JetStreamMaxMsgsPerSubject, "jetstreamMaxMsgsPerSubject", CheckEnv("JETSTREAM_MAX_MSGS_PER_SUBJECT", c.JetStreamMaxMsgsPerSubject).(int), "max messages to keep on the broker per jetstream subject")
|
||||
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level")
|
||||
flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system")
|
||||
flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run")
|
||||
|
@ -180,8 +182,6 @@ func NewConfiguration() *Configuration {
|
|||
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
|
||||
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out")
|
||||
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it")
|
||||
flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression")
|
||||
flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
|
||||
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
|
||||
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
|
||||
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
|
||||
|
@ -228,46 +228,46 @@ func NewConfiguration() *Configuration {
|
|||
// Get a Configuration struct with the default values set.
|
||||
func newConfigurationDefaults() Configuration {
|
||||
c := Configuration{
|
||||
ConfigFolder: "./etc/",
|
||||
SocketFolder: "./tmp",
|
||||
ReadFolder: "./readfolder",
|
||||
EnableReadFolder: true,
|
||||
TCPListener: "",
|
||||
HTTPListener: "",
|
||||
DatabaseFolder: "./var/lib",
|
||||
NodeName: "",
|
||||
BrokerAddress: "127.0.0.1:4222",
|
||||
NatsConnOptTimeout: 20,
|
||||
NatsConnectRetryInterval: 10,
|
||||
NatsReconnectJitter: 100,
|
||||
NatsReconnectJitterTLS: 1,
|
||||
KeysUpdateInterval: 60,
|
||||
AclUpdateInterval: 60,
|
||||
ProfilingPort: "",
|
||||
PromHostAndPort: "",
|
||||
JetstreamsConsume: "",
|
||||
DefaultMessageTimeout: 10,
|
||||
DefaultMessageRetries: 1,
|
||||
DefaultMethodTimeout: 10,
|
||||
SubscribersDataFolder: "./data",
|
||||
CentralNodeName: "central",
|
||||
RootCAPath: "",
|
||||
NkeySeedFile: "",
|
||||
NkeyFromED25519SSHKeyFile: "",
|
||||
NkeySeed: "",
|
||||
ExposeDataFolder: "",
|
||||
ErrorMessageTimeout: 60,
|
||||
ErrorMessageRetries: 10,
|
||||
Compression: "z",
|
||||
Serialization: "cbor",
|
||||
SetBlockProfileRate: 0,
|
||||
EnableSocket: true,
|
||||
EnableSignatureCheck: false,
|
||||
EnableAclCheck: false,
|
||||
EnableDebug: false,
|
||||
LogLevel: "debug",
|
||||
LogConsoleTimestamps: false,
|
||||
KeepPublishersAliveFor: 10,
|
||||
ConfigFolder: "./etc/",
|
||||
SocketFolder: "./tmp",
|
||||
ReadFolder: "./readfolder",
|
||||
EnableReadFolder: true,
|
||||
TCPListener: "",
|
||||
HTTPListener: "",
|
||||
DatabaseFolder: "./var/lib",
|
||||
NodeName: "",
|
||||
BrokerAddress: "127.0.0.1:4222",
|
||||
NatsConnOptTimeout: 20,
|
||||
NatsConnectRetryInterval: 10,
|
||||
NatsReconnectJitter: 100,
|
||||
NatsReconnectJitterTLS: 1,
|
||||
KeysUpdateInterval: 60,
|
||||
AclUpdateInterval: 60,
|
||||
Profiling: "",
|
||||
ProfilingPort: "",
|
||||
PromHostAndPort: "",
|
||||
JetstreamsConsume: "",
|
||||
JetStreamMaxMsgsPerSubject: 100,
|
||||
DefaultMessageTimeout: 10,
|
||||
DefaultMessageRetries: 1,
|
||||
DefaultMethodTimeout: 10,
|
||||
SubscribersDataFolder: "./data",
|
||||
CentralNodeName: "central",
|
||||
RootCAPath: "",
|
||||
NkeySeedFile: "",
|
||||
NkeyFromED25519SSHKeyFile: "",
|
||||
NkeySeed: "",
|
||||
ExposeDataFolder: "",
|
||||
ErrorMessageTimeout: 60,
|
||||
ErrorMessageRetries: 10,
|
||||
SetBlockProfileRate: 0,
|
||||
EnableSocket: true,
|
||||
EnableSignatureCheck: false,
|
||||
EnableAclCheck: false,
|
||||
EnableDebug: false,
|
||||
LogLevel: "debug",
|
||||
LogConsoleTimestamps: false,
|
||||
KeepPublishersAliveFor: 10,
|
||||
|
||||
StartProcesses: StartProcesses{
|
||||
StartPubHello: 30,
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
- [Request Methods](./core_request_methods.md)
|
||||
- [Nats timeouts](./core_nats_timeouts.md)
|
||||
- [Startup folder](./core_startup_folder.md)
|
||||
- [{{ctrl_DATA}} variable](./core_messaging_ctrl_DATA.md)
|
||||
- [{{CTRL_DATA}} variable](./core_messaging_CTRL_DATA.md)
|
||||
- [{{CTRL_FILE}} variable](./core_messaging_CTRL_FILE.md)
|
||||
- [Errors](./core_errors.md)
|
||||
|
||||
# Example standard messages
|
||||
|
|
23
doc/src/core_messaging_CTRL_FILE.md
Normal file
23
doc/src/core_messaging_CTRL_FILE.md
Normal file
|
@ -0,0 +1,23 @@
|
|||
# Message methodArgs variables
|
||||
|
||||
## {{CTRL_FILE}}
|
||||
|
||||
Read a local text file, and embed the content of the file into the methodArgs.
|
||||
|
||||
```yaml
|
||||
---
|
||||
- toNodes:
|
||||
- btdev1
|
||||
#jetstreamToNode: btdev1
|
||||
method: cliCommand
|
||||
methodArgs:
|
||||
- /bin/bash
|
||||
- -c
|
||||
- |
|
||||
echo {{CTRL_FILE:/some_directory/source_file.yaml}}>/other_directory/destination_file.yaml
|
||||
methodTimeout: 3
|
||||
replyMethod: console
|
||||
ACKTimeout: 0
|
||||
```
|
||||
|
||||
The above example will before sending the message read the content of the file `/some_directory/source_file.yaml`. When the message is received at it's destination node and the cliCommand is executed the content will be written to `/other_directory/destination_file.yaml`.
|
1
doc/src/core_messaging_methodargs_variables.md
Normal file
1
doc/src/core_messaging_methodargs_variables.md
Normal file
|
@ -0,0 +1 @@
|
|||
# Message methodArgs variables
|
|
@ -68,7 +68,7 @@ const logNone logLevel = "none"
|
|||
// process if it should continue or not based not based on how severe
|
||||
// the error where. This should be right after sending the error
|
||||
// sending in the process.
|
||||
func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error {
|
||||
func (e *errorKernel) start(ringBufferBulkInCh chan<- Message) error {
|
||||
// Initiate the slog logger.
|
||||
var replaceFunc func(groups []string, a slog.Attr) slog.Attr
|
||||
if !e.configuration.LogConsoleTimestamps {
|
||||
|
@ -131,13 +131,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error {
|
|||
Retries: errEvent.process.configuration.ErrorMessageRetries,
|
||||
}
|
||||
|
||||
sam := subjectAndMessage{
|
||||
Subject: newSubject(ErrorLog, "errorCentral"),
|
||||
Message: m,
|
||||
}
|
||||
|
||||
// Put the message on the channel to the ringbuffer.
|
||||
ringBufferBulkInCh <- sam
|
||||
ringBufferBulkInCh <- m
|
||||
|
||||
// if errEvent.process.configuration.EnableDebug {
|
||||
// log.Printf("%v\n", er)
|
||||
|
|
|
@ -101,11 +101,6 @@ type Subject struct {
|
|||
ToNode string `json:"node" yaml:"toNode"`
|
||||
// method, what is this message doing, etc. CLICommand, Syslog, etc.
|
||||
Method Method `json:"method" yaml:"method"`
|
||||
// messageCh is used by publisher kind processes to read new messages
|
||||
// to be published. The content on this channel have been routed here
|
||||
// from routeMessagesToPublish in *server.
|
||||
// This channel is only used for publishing processes.
|
||||
messageCh chan Message
|
||||
}
|
||||
|
||||
// newSubject will return a new variable of the type subject, and insert
|
||||
|
@ -124,9 +119,8 @@ func newSubject(method Method, node string) Subject {
|
|||
}
|
||||
|
||||
return Subject{
|
||||
ToNode: node,
|
||||
Method: method,
|
||||
messageCh: make(chan Message),
|
||||
ToNode: node,
|
||||
Method: method,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,9 +133,8 @@ func newSubjectNoVerifyHandler(method Method, node string) Subject {
|
|||
// Get the Event type for the Method.
|
||||
|
||||
return Subject{
|
||||
ToNode: node,
|
||||
Method: method,
|
||||
messageCh: make(chan Message),
|
||||
ToNode: node,
|
||||
Method: method,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ func (s *server) readStartupFolder() {
|
|||
readBytes = bytes.Trim(readBytes, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
messages, err := s.convertBytesToMessages(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: startup folder: malformed json read: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
|
@ -85,38 +85,35 @@ func (s *server) readStartupFolder() {
|
|||
}
|
||||
|
||||
// Check if fromNode field is specified, and remove the message if blank.
|
||||
for i := range sams {
|
||||
for i := range messages {
|
||||
// We want to allow the use of nodeName local only in startup folder, and
|
||||
// if used we substite it for the local node name.
|
||||
if sams[i].Message.ToNode == "local" {
|
||||
sams[i].Message.ToNode = Node(s.nodeName)
|
||||
sams[i].Subject.ToNode = s.nodeName
|
||||
if messages[i].ToNode == "local" {
|
||||
messages[i].ToNode = Node(s.nodeName)
|
||||
}
|
||||
|
||||
switch {
|
||||
case sams[i].Message.FromNode == "":
|
||||
// Remove the first message from the slice.
|
||||
sams = append(sams[:i], sams[i+1:]...)
|
||||
case messages[i].FromNode == "":
|
||||
er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message")
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
continue
|
||||
|
||||
case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0:
|
||||
// Remove the first message from the slice.
|
||||
sams = append(sams[:i], sams[i+1:]...)
|
||||
case messages[i].ToNode == "" && len(messages[i].ToNodes) == 0:
|
||||
er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message")
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
j, err := json.MarshalIndent(sams, "", " ")
|
||||
j, err := json.MarshalIndent(messages, "", " ")
|
||||
if err != nil {
|
||||
log.Printf("test error: %v\n", err)
|
||||
}
|
||||
er = fmt.Errorf("%v", string(j))
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
||||
|
||||
s.samSendLocalCh <- sams
|
||||
s.messageDeliverLocalCh <- messages
|
||||
|
||||
}
|
||||
|
||||
|
@ -128,29 +125,28 @@ func (s *server) jetstreamPublish() {
|
|||
|
||||
// Create a stream
|
||||
_, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{
|
||||
Name: "NODES",
|
||||
Subjects: []string{"NODES.>"},
|
||||
// TODO: Create Flag ?
|
||||
MaxMsgsPerSubject: 100,
|
||||
// MaxMsgsPerSubject: 1,
|
||||
Name: "NODES",
|
||||
Subjects: []string{"NODES.>"},
|
||||
MaxMsgsPerSubject: int64(s.configuration.JetStreamMaxMsgsPerSubject),
|
||||
})
|
||||
|
||||
// Publish messages.
|
||||
for {
|
||||
select {
|
||||
case jsMSG := <-s.jetstreamPublishCh:
|
||||
b, err := json.Marshal(jsMSG)
|
||||
case msg := <-s.jetstreamPublishCh:
|
||||
|
||||
b, err := s.messageSerializeAndCompress(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err)
|
||||
}
|
||||
|
||||
subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode))
|
||||
subject := string(fmt.Sprintf("NODES.%v", msg.JetstreamToNode))
|
||||
_, err = js.Publish(s.ctx, subject, b)
|
||||
if err != nil {
|
||||
log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG)
|
||||
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, msg)
|
||||
case <-s.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
@ -183,7 +179,7 @@ func (s *server) jetstreamConsume() {
|
|||
}
|
||||
}
|
||||
|
||||
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues)
|
||||
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %v", filterSubjectValues)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
||||
|
||||
cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{
|
||||
|
@ -201,10 +197,9 @@ func (s *server) jetstreamConsume() {
|
|||
|
||||
msg.Ack()
|
||||
|
||||
m := Message{}
|
||||
err := json.Unmarshal(msg.Data(), &m)
|
||||
m, err := s.messageDeserializeAndUncompress(msg.Data())
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err)
|
||||
er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
||||
return
|
||||
}
|
||||
|
@ -214,15 +209,7 @@ func (s *server) jetstreamConsume() {
|
|||
// nodeName of the consumer in the ctrl Message, so we are sure it is handled locally.
|
||||
m.ToNode = Node(s.nodeName)
|
||||
|
||||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
// If a message is received via
|
||||
s.samSendLocalCh <- []subjectAndMessage{sam}
|
||||
s.messageDeliverLocalCh <- []Message{m}
|
||||
})
|
||||
defer consumeContext.Stop()
|
||||
|
||||
|
@ -302,30 +289,30 @@ func (s *server) readSocket() {
|
|||
readBytes = bytes.Trim(readBytes, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
messages, err := s.convertBytesToMessages(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range sams {
|
||||
for i := range messages {
|
||||
|
||||
// Fill in the value for the FromNode field, so the receiver
|
||||
// can check this field to know where it came from.
|
||||
sams[i].Message.FromNode = Node(s.nodeName)
|
||||
messages[i].FromNode = Node(s.nodeName)
|
||||
|
||||
// Send an info message to the central about the message picked
|
||||
// for auditing.
|
||||
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
|
||||
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, messages[i])
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
||||
|
||||
s.newMessagesCh <- sams[i]
|
||||
s.newMessagesCh <- messages[i]
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
|
||||
s.auditLogCh <- sams
|
||||
s.auditLogCh <- messages
|
||||
|
||||
}(conn)
|
||||
}
|
||||
|
@ -381,47 +368,45 @@ func (s *server) readFolder() {
|
|||
}
|
||||
fh.Close()
|
||||
|
||||
fmt.Printf("------- DEBUG: %v\n", b)
|
||||
|
||||
b = bytes.Trim(b, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(b)
|
||||
messages, err := s.convertBytesToMessages(b)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range sams {
|
||||
for i := range messages {
|
||||
|
||||
// Fill in the value for the FromNode field, so the receiver
|
||||
// can check this field to know where it came from.
|
||||
sams[i].Message.FromNode = Node(s.nodeName)
|
||||
messages[i].FromNode = Node(s.nodeName)
|
||||
|
||||
// Send an info message to the central about the message picked
|
||||
// for auditing.
|
||||
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message)
|
||||
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, messages[i])
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
|
||||
// Check if it is a message to publish with Jetstream.
|
||||
if sams[i].Message.JetstreamToNode != "" {
|
||||
if messages[i].JetstreamToNode != "" {
|
||||
|
||||
s.jetstreamPublishCh <- sams[i].Message
|
||||
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams)
|
||||
s.jetstreamPublishCh <- messages[i]
|
||||
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", messages)
|
||||
s.errorKernel.logDebug(er)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
s.newMessagesCh <- sams[i]
|
||||
s.newMessagesCh <- messages[i]
|
||||
|
||||
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
|
||||
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", messages)
|
||||
s.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
s.auditLogCh <- sams
|
||||
s.auditLogCh <- messages
|
||||
|
||||
// Delete the file.
|
||||
err = os.Remove(event.Name)
|
||||
|
@ -498,23 +483,23 @@ func (s *server) readTCPListener() {
|
|||
readBytes = bytes.Trim(readBytes, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
messages, err := s.convertBytesToMessages(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range sams {
|
||||
for i := range messages {
|
||||
|
||||
// Fill in the value for the FromNode field, so the receiver
|
||||
// can check this field to know where it came from.
|
||||
sams[i].Message.FromNode = Node(s.nodeName)
|
||||
s.newMessagesCh <- sams[i]
|
||||
messages[i].FromNode = Node(s.nodeName)
|
||||
s.newMessagesCh <- messages[i]
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
s.auditLogCh <- sams
|
||||
s.auditLogCh <- messages
|
||||
|
||||
}(conn)
|
||||
}
|
||||
|
@ -543,23 +528,23 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
|||
readBytes = bytes.Trim(readBytes, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
messages, err := s.convertBytesToMessages(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range sams {
|
||||
for i := range messages {
|
||||
|
||||
// Fill in the value for the FromNode field, so the receiver
|
||||
// can check this field to know where it came from.
|
||||
sams[i].Message.FromNode = Node(s.nodeName)
|
||||
s.newMessagesCh <- sams[i]
|
||||
messages[i].FromNode = Node(s.nodeName)
|
||||
s.newMessagesCh <- messages[i]
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
s.auditLogCh <- sams
|
||||
s.auditLogCh <- messages
|
||||
|
||||
}
|
||||
|
||||
|
@ -583,21 +568,11 @@ func (s *server) readHttpListener() {
|
|||
}()
|
||||
}
|
||||
|
||||
// The subject are made up of different parts of the message field.
|
||||
// To make things easier and to avoid figuring out what the subject
|
||||
// is in all places we've created the concept of subjectAndMessage
|
||||
// (sam) where we get the subject for the message once, and use the
|
||||
// sam structure with subject alongside the message instead.
|
||||
type subjectAndMessage struct {
|
||||
Subject `json:"subject" yaml:"subject"`
|
||||
Message `json:"message" yaml:"message"`
|
||||
}
|
||||
|
||||
// convertBytesToSAMs will range over the byte representing a message given in
|
||||
// json format. For each element found the Message type will be converted into
|
||||
// a SubjectAndMessage type value and appended to a slice, and the slice is
|
||||
// returned to the caller.
|
||||
func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
||||
func (s *server) convertBytesToMessages(b []byte) ([]Message, error) {
|
||||
MsgSlice := []Message{}
|
||||
|
||||
err := yaml.Unmarshal(b, &MsgSlice)
|
||||
|
@ -609,22 +584,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
|||
MsgSlice = s.checkMessageToNodes(MsgSlice)
|
||||
s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice)))
|
||||
|
||||
sam := []subjectAndMessage{}
|
||||
|
||||
// Range over all the messages parsed from json, and create a subject for
|
||||
// each message.
|
||||
for _, m := range MsgSlice {
|
||||
sm, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, m, er, logWarning)
|
||||
|
||||
continue
|
||||
}
|
||||
sam = append(sam, sm)
|
||||
}
|
||||
|
||||
return sam, nil
|
||||
return MsgSlice, nil
|
||||
}
|
||||
|
||||
// checkMessageToNodes will check that either toHost or toHosts are
|
||||
|
@ -668,37 +628,3 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
|
|||
|
||||
return msgs
|
||||
}
|
||||
|
||||
// newSubjectAndMessage will look up the correct values and value types to
|
||||
// be used in a subject for a Message (sam), and return the a combined structure
|
||||
// of type subjectAndMessage.
|
||||
func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
|
||||
// We need to create a tempory method type to look up the kind for the
|
||||
// real method for the message.
|
||||
var mt Method
|
||||
|
||||
tmpH := mt.getHandler(m.Method)
|
||||
if tmpH == nil {
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method)
|
||||
}
|
||||
|
||||
switch {
|
||||
case m.ToNode == "":
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m)
|
||||
case m.Method == "":
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m)
|
||||
}
|
||||
|
||||
sub := Subject{
|
||||
ToNode: string(m.ToNode),
|
||||
Method: m.Method,
|
||||
messageCh: make(chan Message),
|
||||
}
|
||||
|
||||
sam := subjectAndMessage{
|
||||
Subject: sub,
|
||||
Message: m,
|
||||
}
|
||||
|
||||
return sam, nil
|
||||
}
|
||||
|
|
372
process.go
372
process.go
|
@ -1,44 +1,23 @@
|
|||
package ctrl
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
// "google.golang.org/protobuf/internal/errors"
|
||||
)
|
||||
|
||||
// processKind are either kindSubscriber or kindPublisher, and are
|
||||
// used to distinguish the kind of process to spawn and to know
|
||||
// the process kind put in the process map.
|
||||
type processKind string
|
||||
|
||||
const (
|
||||
processKindSubscriber processKind = "subscriber"
|
||||
processKindPublisher processKind = "publisher"
|
||||
)
|
||||
|
||||
// process holds all the logic to handle a message type and it's
|
||||
// method, subscription/publishin messages for a subject, and more.
|
||||
type process struct {
|
||||
// isSubProcess is used to indentify subprocesses spawned by other processes.
|
||||
isSubProcess bool
|
||||
// isLongRunningPublisher is set to true for a publisher service that should not
|
||||
// be auto terminated like a normal autospawned publisher would be when the the
|
||||
// inactivity timeout have expired
|
||||
isLongRunningPublisher bool
|
||||
// server
|
||||
server *server
|
||||
// messageID
|
||||
|
@ -50,8 +29,7 @@ type process struct {
|
|||
// Put a node here to be able know the node a process is at.
|
||||
node Node
|
||||
// The processID for the current process
|
||||
processID int
|
||||
processKind processKind
|
||||
processID int
|
||||
// methodsAvailable
|
||||
methodsAvailable MethodsAvailable
|
||||
// procFunc is a function that will be started when a worker process
|
||||
|
@ -88,7 +66,7 @@ type process struct {
|
|||
// copy of the configuration from server
|
||||
configuration *Configuration
|
||||
// The new messages channel copied from *Server
|
||||
newMessagesCh chan<- subjectAndMessage
|
||||
newMessagesCh chan<- Message
|
||||
// The structure who holds all processes information
|
||||
processes *processes
|
||||
// nats connection
|
||||
|
@ -104,7 +82,7 @@ type process struct {
|
|||
|
||||
// handler is used to directly attach a handler to a process upon
|
||||
// creation of the process, like when a process is spawning a sub
|
||||
// process like REQCopySrc do. If we're not spawning a sub process
|
||||
// process like copySrc do. If we're not spawning a sub process
|
||||
// and it is a regular process the handler to use is found with the
|
||||
// getHandler method
|
||||
handler func(proc process, message Message, node string) ([]byte, error)
|
||||
|
@ -124,7 +102,7 @@ type process struct {
|
|||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
|
||||
func newProcess(ctx context.Context, server *server, subject Subject) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
server.processes.mu.Lock()
|
||||
server.processes.lastProcessID++
|
||||
|
@ -141,7 +119,6 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
subject: subject,
|
||||
node: Node(server.configuration.NodeName),
|
||||
processID: pid,
|
||||
processKind: processKind,
|
||||
methodsAvailable: method.GetMethodsAvailable(),
|
||||
newMessagesCh: server.newMessagesCh,
|
||||
configuration: server.configuration,
|
||||
|
@ -158,14 +135,8 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
|
||||
// We use the full name of the subject to identify a unique
|
||||
// process. We can do that since a process can only handle
|
||||
// one message queue.
|
||||
|
||||
if proc.processKind == processKindPublisher {
|
||||
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
|
||||
}
|
||||
if proc.processKind == processKindSubscriber {
|
||||
proc.processName = processNameGet(proc.subject.name(), processKindSubscriber)
|
||||
}
|
||||
// one request type.
|
||||
proc.processName = processNameGet(proc.subject.name())
|
||||
|
||||
return proc
|
||||
}
|
||||
|
@ -177,24 +148,16 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
|||
//
|
||||
// It will give the process the next available ID, and also add the
|
||||
// process to the processes map in the server structure.
|
||||
func (p process) spawnWorker() {
|
||||
func (p process) start() {
|
||||
|
||||
// Add prometheus metrics for the process.
|
||||
if !p.isSubProcess {
|
||||
p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)})
|
||||
}
|
||||
|
||||
// Start a publisher worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
if p.processKind == processKindPublisher {
|
||||
p.startPublisher()
|
||||
}
|
||||
|
||||
// Start a subscriber worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
if p.processKind == processKindSubscriber {
|
||||
p.startSubscriber()
|
||||
}
|
||||
// to handle executing the request method defined in the message.
|
||||
p.startSubscriber()
|
||||
|
||||
// Add information about the new process to the started processes map.
|
||||
p.processes.active.mu.Lock()
|
||||
|
@ -205,27 +168,6 @@ func (p process) spawnWorker() {
|
|||
p.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
func (p process) startPublisher() {
|
||||
// If there is a procFunc for the process, start it.
|
||||
if p.procFunc != nil {
|
||||
// Initialize the channel for communication between the proc and
|
||||
// the procFunc.
|
||||
p.procFuncCh = make(chan Message)
|
||||
|
||||
// Start the procFunc in it's own anonymous func so we are able
|
||||
// to get the return error.
|
||||
go func() {
|
||||
err := p.procFunc(p.ctx, p.procFuncCh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go p.publishMessages(p.natsConn)
|
||||
}
|
||||
|
||||
func (p process) startSubscriber() {
|
||||
// If there is a procFunc for the process, start it.
|
||||
if p.procFunc != nil {
|
||||
|
@ -244,7 +186,7 @@ func (p process) startSubscriber() {
|
|||
}()
|
||||
}
|
||||
|
||||
p.natsSubscription = p.subscribeMessages()
|
||||
p.natsSubscription = p.startNatsSubscriber()
|
||||
|
||||
// We also need to be able to remove all the information about this process
|
||||
// when the process context is canceled.
|
||||
|
@ -271,26 +213,29 @@ var (
|
|||
ErrACKSubscribeRetry = errors.New("ctrl: retrying to subscribe for ack message")
|
||||
)
|
||||
|
||||
// messageDeliverNats will create the Nats message with headers and payload.
|
||||
// It will also take care of the delivering the message that is converted to
|
||||
// gob or cbor format as a nats.Message. It will also take care of checking
|
||||
// timeouts and retries specified for the message.
|
||||
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
|
||||
// publishNats will create the Nats message with headers and payload.
|
||||
// The payload of the nats message, which is the ctrl message will be
|
||||
// serialized and compress before put in the data field of the nats
|
||||
// message.
|
||||
// It will also take care of resending if not delievered, and timeouts.
|
||||
func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
|
||||
retryAttempts := 0
|
||||
|
||||
if message.RetryWait <= 0 {
|
||||
message.RetryWait = 0
|
||||
}
|
||||
|
||||
subject := newSubject(message.Method, string(message.ToNode))
|
||||
|
||||
// The for loop will run until the message is delivered successfully,
|
||||
// or that retries are reached.
|
||||
for {
|
||||
msg := &nats.Msg{
|
||||
Subject: string(p.subject.name()),
|
||||
Subject: string(subject.name()),
|
||||
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"),
|
||||
// Structure of the reply message are:
|
||||
// <nodename>.<message type>.<method>.reply
|
||||
Reply: fmt.Sprintf("%s.reply", p.subject.name()),
|
||||
Reply: fmt.Sprintf("%s.reply", subject.name()),
|
||||
Data: natsMsgPayload,
|
||||
Header: natsMsgHeader,
|
||||
}
|
||||
|
@ -390,7 +335,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
|
||||
switch {
|
||||
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
|
||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, subject.name(), err)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||
|
@ -399,13 +344,13 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
return ErrACKSubscribeRetry
|
||||
|
||||
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", subject.name(), err)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
return er
|
||||
|
||||
default:
|
||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", subject.name(), err)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
return er
|
||||
|
@ -442,7 +387,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
// kind of message it is and then it will check how to handle that message type,
|
||||
// and then call the correct method handler for it.
|
||||
//
|
||||
// This handler function should be started in it's own go routine,so
|
||||
// This function should be started in it's own go routine,so
|
||||
// one individual handler is started per message received so we can keep
|
||||
// the state of the message being processed, and then reply back to the
|
||||
// correct sending process's reply, meaning so we ACK back to the correct
|
||||
|
@ -462,90 +407,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
p.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
// If compression is used, decompress it to get the gob data. If
|
||||
// compression is not used it is the gob encoded data we already
|
||||
// got in msgData so we do nothing with it.
|
||||
if val, ok := msg.Header["cmp"]; ok {
|
||||
switch val[0] {
|
||||
case "z":
|
||||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
msgData, err = zr.DecodeAll(msg.Data, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd decoding failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
zr.Close()
|
||||
return
|
||||
}
|
||||
|
||||
zr.Close()
|
||||
|
||||
case "g":
|
||||
r := bytes.NewReader(msgData)
|
||||
gr, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(gr)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
gr.Close()
|
||||
|
||||
msgData = b
|
||||
}
|
||||
}
|
||||
|
||||
message := Message{}
|
||||
|
||||
// TODO: Jetstream
|
||||
// Use CBOR and Compression for all messages, and drop the use of the header fields.
|
||||
|
||||
// Check if serialization is specified.
|
||||
// Will default to gob serialization if nothing or non existing value is specified.
|
||||
if val, ok := msg.Header["serial"]; ok {
|
||||
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
|
||||
switch val[0] {
|
||||
case "cbor":
|
||||
err := cbor.Unmarshal(msgData, &message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
default: // Deaults to gob if no match was found.
|
||||
r := bytes.NewReader(msgData)
|
||||
gobDec := gob.NewDecoder(r)
|
||||
|
||||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Default to gob if serialization flag was not specified.
|
||||
r := bytes.NewReader(msgData)
|
||||
gobDec := gob.NewDecoder(r)
|
||||
|
||||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
message, err := p.server.messageDeserializeAndUncompress(msgData)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageSubscriberHandler: deserialize and uncompress failed: %v", err)
|
||||
// p.errorKernel.logDebug(er)
|
||||
log.Fatalf("%v\n", er)
|
||||
}
|
||||
|
||||
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
|
||||
|
@ -778,7 +644,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
|
|||
// SubscribeMessage will register the Nats callback function for the specified
|
||||
// nats subject. This allows us to receive Nats messages for a given subject
|
||||
// on a node.
|
||||
func (p process) subscribeMessages() *nats.Subscription {
|
||||
func (p process) startNatsSubscriber() *nats.Subscription {
|
||||
subject := string(p.subject.name())
|
||||
// natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
|
||||
natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
|
||||
|
@ -796,82 +662,6 @@ func (p process) subscribeMessages() *nats.Subscription {
|
|||
return natsSubscription
|
||||
}
|
||||
|
||||
// publishMessages will do the publishing of messages for one single
|
||||
// process. The function should be run as a goroutine, and will run
|
||||
// as long as the process it belongs to is running.
|
||||
func (p process) publishMessages(natsConn *nats.Conn) {
|
||||
var once sync.Once
|
||||
|
||||
var zEnc *zstd.Encoder
|
||||
// Prepare a zstd encoder if enabled. By enabling it here before
|
||||
// looping over the messages to send below, we can reuse the zstd
|
||||
// encoder for all messages.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
|
||||
p.errorKernel.logError(er)
|
||||
os.Exit(1)
|
||||
}
|
||||
zEnc = enc
|
||||
defer zEnc.Close()
|
||||
|
||||
}
|
||||
|
||||
// Adding a timer that will be used for when to remove the sub process
|
||||
// publisher. The timer is reset each time a message is published with
|
||||
// the process, so the sub process publisher will not be removed until
|
||||
// it have not received any messages for the given amount of time.
|
||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
|
||||
// Wait and read the next message on the message channel, or
|
||||
// exit this function if Cancel are received via ctx.
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if p.isLongRunningPublisher {
|
||||
er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// We only want to remove subprocesses
|
||||
// REMOVED 120123: Removed if so all publishers should be canceled if inactive.
|
||||
//if p.isSubProcess {
|
||||
p.processes.active.mu.Lock()
|
||||
p.ctxCancel()
|
||||
delete(p.processes.active.procNames, p.processName)
|
||||
p.processes.active.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf("info: canceled publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
return
|
||||
//}
|
||||
|
||||
case m := <-p.subject.messageCh:
|
||||
ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
|
||||
// Sign the methodArgs, and add the signature to the message.
|
||||
m.ArgSignature = p.addMethodArgSignature(m)
|
||||
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
|
||||
|
||||
go p.publishAMessage(m, zEnc, &once, natsConn)
|
||||
case <-p.ctx.Done():
|
||||
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p process) addMethodArgSignature(m Message) []byte {
|
||||
argsString := argsToString(m.MethodArgs)
|
||||
sign := ed25519.Sign(p.nodeAuth.SignPrivateKey, []byte(argsString))
|
||||
|
@ -879,108 +669,26 @@ func (p process) addMethodArgSignature(m Message) []byte {
|
|||
return sign
|
||||
}
|
||||
|
||||
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) {
|
||||
func (p process) publishAMessage(m Message, natsConn *nats.Conn) {
|
||||
// Create the initial header, and set values below depending on the
|
||||
// various configuration options chosen.
|
||||
natsMsgHeader := make(nats.Header)
|
||||
natsMsgHeader["fromNode"] = []string{string(p.node)}
|
||||
|
||||
// The serialized value of the nats message payload
|
||||
var natsMsgPayloadSerialized []byte
|
||||
|
||||
// encode the message structure into gob binary format before putting
|
||||
// it into a nats message.
|
||||
// Prepare a gob encoder with a buffer before we start the loop
|
||||
switch p.configuration.Serialization {
|
||||
case "cbor":
|
||||
b, err := cbor.Marshal(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
natsMsgPayloadSerialized = b
|
||||
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
|
||||
|
||||
default:
|
||||
var bufGob bytes.Buffer
|
||||
gobEnc := gob.NewEncoder(&bufGob)
|
||||
err := gobEnc.Encode(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
natsMsgPayloadSerialized = bufGob.Bytes()
|
||||
natsMsgHeader["serial"] = []string{"gob"}
|
||||
}
|
||||
|
||||
// Get the process name so we can look up the process in the
|
||||
// processes map, and increment the message counter.
|
||||
pn := processNameGet(p.subject.name(), processKindPublisher)
|
||||
|
||||
// The compressed value of the nats message payload. The content
|
||||
// can either be compressed or in it's original form depening on
|
||||
// the outcome of the switch below, and if compression were chosen
|
||||
// or not.
|
||||
var natsMsgPayloadCompressed []byte
|
||||
|
||||
// Compress the data payload if selected with configuration flag.
|
||||
// The compression chosen is later set in the nats msg header when
|
||||
// calling p.messageDeliverNats below.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
// p.zEncMutex.Lock()
|
||||
// zEnc.Reset(nil)
|
||||
// p.zEncMutex.Unlock()
|
||||
|
||||
case "g": // gzip
|
||||
var buf bytes.Buffer
|
||||
func() {
|
||||
gzipW := gzip.NewWriter(&buf)
|
||||
defer gzipW.Close()
|
||||
defer gzipW.Flush()
|
||||
_, err := gzipW.Write(natsMsgPayloadSerialized)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to write gzip: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
natsMsgPayloadCompressed = buf.Bytes()
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
case "": // no compression
|
||||
natsMsgPayloadCompressed = natsMsgPayloadSerialized
|
||||
natsMsgHeader["cmp"] = []string{"none"}
|
||||
|
||||
default: // no compression
|
||||
// Allways log the error to console.
|
||||
er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression")
|
||||
b, err := p.server.messageSerializeAndCompress(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: publishAMessage: serialize and compress failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
// We only wan't to send the error message to errorCentral once.
|
||||
once.Do(func() {
|
||||
p.errorKernel.logDebug(er)
|
||||
})
|
||||
|
||||
// No compression, so we just assign the value of the serialized
|
||||
// data directly to the variable used with messageDeliverNats.
|
||||
natsMsgPayloadCompressed = natsMsgPayloadSerialized
|
||||
natsMsgHeader["cmp"] = []string{"none"}
|
||||
return
|
||||
}
|
||||
|
||||
// Create the Nats message with headers and payload, and do the
|
||||
// sending of the message.
|
||||
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
|
||||
p.publishNats(b, natsMsgHeader, natsConn, m)
|
||||
|
||||
// Get the process name so we can look up the process in the
|
||||
// processes map, and increment the message counter.
|
||||
pn := processNameGet(p.subject.name())
|
||||
// Increment the counter for the next message to be sent.
|
||||
p.messageID++
|
||||
|
||||
|
|
119
processes.go
119
processes.go
|
@ -92,25 +92,25 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
// --- Subscriber services that can be started via flags
|
||||
|
||||
proc.startup.subscriber(proc, OpProcessList, nil)
|
||||
proc.startup.subscriber(proc, OpProcessStart, nil)
|
||||
proc.startup.subscriber(proc, OpProcessStop, nil)
|
||||
proc.startup.subscriber(proc, Test, nil)
|
||||
proc.startup.startProcess(proc, OpProcessList, nil)
|
||||
proc.startup.startProcess(proc, OpProcessStart, nil)
|
||||
proc.startup.startProcess(proc, OpProcessStop, nil)
|
||||
proc.startup.startProcess(proc, Test, nil)
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubFileAppend {
|
||||
proc.startup.subscriber(proc, FileAppend, nil)
|
||||
proc.startup.startProcess(proc, FileAppend, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubFile {
|
||||
proc.startup.subscriber(proc, File, nil)
|
||||
proc.startup.startProcess(proc, File, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCopySrc {
|
||||
proc.startup.subscriber(proc, CopySrc, nil)
|
||||
proc.startup.startProcess(proc, CopySrc, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCopyDst {
|
||||
proc.startup.subscriber(proc, CopyDst, nil)
|
||||
proc.startup.startProcess(proc, CopyDst, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubHello {
|
||||
|
@ -149,23 +149,24 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
}
|
||||
}
|
||||
proc.startup.subscriber(proc, Hello, pf)
|
||||
proc.startup.startProcess(proc, Hello, pf)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.IsCentralErrorLogger {
|
||||
proc.startup.subscriber(proc, ErrorLog, nil)
|
||||
proc.startup.startProcess(proc, ErrorLog, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCliCommand {
|
||||
proc.startup.subscriber(proc, CliCommand, nil)
|
||||
proc.startup.startProcess(proc, CliCommand, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubConsole {
|
||||
proc.startup.subscriber(proc, Console, nil)
|
||||
proc.startup.startProcess(proc, Console, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartPubHello != 0 {
|
||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
|
||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
@ -185,13 +186,7 @@ func (p *processes) Start(proc process) {
|
|||
Retries: 1,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: ProcessesStart: %v", err)
|
||||
p.errorKernel.errSend(proc, m, er, logError)
|
||||
}
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- m
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
@ -203,7 +198,7 @@ func (p *processes) Start(proc process) {
|
|||
}
|
||||
}
|
||||
}
|
||||
proc.startup.publisher(proc, Hello, pf)
|
||||
proc.startup.startProcess(proc, HelloPublisher, pf)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.EnableKeyUpdates {
|
||||
|
@ -236,12 +231,7 @@ func (p *processes) Start(proc process) {
|
|||
}
|
||||
proc.nodeAuth.publicKeys.mu.Unlock()
|
||||
|
||||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
p.errorKernel.errSend(proc, m, err, logError)
|
||||
}
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- m
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
@ -253,8 +243,8 @@ func (p *processes) Start(proc process) {
|
|||
}
|
||||
}
|
||||
}
|
||||
proc.startup.publisher(proc, KeysRequestUpdate, pf)
|
||||
proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
|
||||
proc.startup.startProcess(proc, KeysRequestUpdate, pf)
|
||||
proc.startup.startProcess(proc, KeysDeliverUpdate, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.EnableAclUpdates {
|
||||
|
@ -284,13 +274,7 @@ func (p *processes) Start(proc process) {
|
|||
}
|
||||
proc.nodeAuth.nodeAcl.mu.Unlock()
|
||||
|
||||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
p.errorKernel.errSend(proc, m, err, logError)
|
||||
log.Printf("error: ProcessesStart: %v\n", err)
|
||||
}
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- m
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
@ -302,41 +286,41 @@ func (p *processes) Start(proc process) {
|
|||
}
|
||||
}
|
||||
}
|
||||
proc.startup.publisher(proc, AclRequestUpdate, pf)
|
||||
proc.startup.subscriber(proc, AclDeliverUpdate, nil)
|
||||
proc.startup.startProcess(proc, AclRequestUpdate, pf)
|
||||
proc.startup.startProcess(proc, AclDeliverUpdate, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.IsCentralAuth {
|
||||
proc.startup.subscriber(proc, KeysRequestUpdate, nil)
|
||||
proc.startup.subscriber(proc, KeysAllow, nil)
|
||||
proc.startup.subscriber(proc, KeysDelete, nil)
|
||||
proc.startup.subscriber(proc, AclRequestUpdate, nil)
|
||||
proc.startup.subscriber(proc, AclAddCommand, nil)
|
||||
proc.startup.subscriber(proc, AclDeleteCommand, nil)
|
||||
proc.startup.subscriber(proc, AclDeleteSource, nil)
|
||||
proc.startup.subscriber(proc, AclGroupNodesAddNode, nil)
|
||||
proc.startup.subscriber(proc, AclGroupNodesDeleteNode, nil)
|
||||
proc.startup.subscriber(proc, AclGroupNodesDeleteGroup, nil)
|
||||
proc.startup.subscriber(proc, AclGroupCommandsAddCommand, nil)
|
||||
proc.startup.subscriber(proc, AclGroupCommandsDeleteCommand, nil)
|
||||
proc.startup.subscriber(proc, AclGroupCommandsDeleteGroup, nil)
|
||||
proc.startup.subscriber(proc, AclExport, nil)
|
||||
proc.startup.subscriber(proc, AclImport, nil)
|
||||
proc.startup.startProcess(proc, KeysRequestUpdate, nil)
|
||||
proc.startup.startProcess(proc, KeysAllow, nil)
|
||||
proc.startup.startProcess(proc, KeysDelete, nil)
|
||||
proc.startup.startProcess(proc, AclRequestUpdate, nil)
|
||||
proc.startup.startProcess(proc, AclAddCommand, nil)
|
||||
proc.startup.startProcess(proc, AclDeleteCommand, nil)
|
||||
proc.startup.startProcess(proc, AclDeleteSource, nil)
|
||||
proc.startup.startProcess(proc, AclGroupNodesAddNode, nil)
|
||||
proc.startup.startProcess(proc, AclGroupNodesDeleteNode, nil)
|
||||
proc.startup.startProcess(proc, AclGroupNodesDeleteGroup, nil)
|
||||
proc.startup.startProcess(proc, AclGroupCommandsAddCommand, nil)
|
||||
proc.startup.startProcess(proc, AclGroupCommandsDeleteCommand, nil)
|
||||
proc.startup.startProcess(proc, AclGroupCommandsDeleteGroup, nil)
|
||||
proc.startup.startProcess(proc, AclExport, nil)
|
||||
proc.startup.startProcess(proc, AclImport, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubHttpGet {
|
||||
proc.startup.subscriber(proc, HttpGet, nil)
|
||||
proc.startup.startProcess(proc, HttpGet, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubTailFile {
|
||||
proc.startup.subscriber(proc, TailFile, nil)
|
||||
proc.startup.startProcess(proc, TailFile, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCliCommandCont {
|
||||
proc.startup.subscriber(proc, CliCommandCont, nil)
|
||||
proc.startup.startProcess(proc, CliCommandCont, nil)
|
||||
}
|
||||
|
||||
proc.startup.subscriber(proc, PublicKey, nil)
|
||||
proc.startup.startProcess(proc, PublicKey, nil)
|
||||
}
|
||||
|
||||
// Stop all subscriber processes.
|
||||
|
@ -367,9 +351,9 @@ func newStartup(server *server) *startup {
|
|||
return &s
|
||||
}
|
||||
|
||||
// subscriber will start a subscriber process. It takes the initial process, request method,
|
||||
// startProcess will start a process. It takes the initial process, request method,
|
||||
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
|
||||
func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
||||
func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
||||
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
|
@ -382,23 +366,10 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
|
|||
}
|
||||
|
||||
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
|
||||
proc := newProcess(p.ctx, p.processes.server, sub)
|
||||
proc.procFunc = pf
|
||||
|
||||
go proc.spawnWorker()
|
||||
}
|
||||
|
||||
// publisher will start a publisher process. It takes the initial process, request method,
|
||||
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
|
||||
func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
||||
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
|
||||
p.errorKernel.logDebug(er)
|
||||
sub := newSubject(m, string(p.node))
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher)
|
||||
proc.procFunc = pf
|
||||
proc.isLongRunningPublisher = true
|
||||
|
||||
go proc.spawnWorker()
|
||||
go proc.start()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
|
@ -412,7 +383,7 @@ func (p *processes) printProcessesMap() {
|
|||
p.active.mu.Lock()
|
||||
|
||||
for pName, proc := range p.active.procNames {
|
||||
er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name())
|
||||
er := fmt.Errorf("info: proc - procName in map: %v , id: %v, subject: %v", pName, proc.processID, proc.subject.name())
|
||||
proc.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
|
|
15
requests.go
15
requests.go
|
@ -104,8 +104,9 @@ const (
|
|||
SUBCopySrc Method = "subCopySrc"
|
||||
// Write the destination copied to some node.
|
||||
SUBCopyDst Method = "subCopyDst"
|
||||
// Send Hello I'm here message.
|
||||
Hello Method = "hello"
|
||||
// Hello I'm here message.
|
||||
Hello Method = "hello"
|
||||
HelloPublisher Method = "helloPublisher"
|
||||
// Error log methods to centralError node.
|
||||
ErrorLog Method = "errorLog"
|
||||
// Http Get
|
||||
|
@ -187,6 +188,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
SUBCopySrc: HandlerFunc(methodSUB),
|
||||
SUBCopyDst: HandlerFunc(methodSUB),
|
||||
Hello: HandlerFunc(methodHello),
|
||||
HelloPublisher: HandlerFunc(nil),
|
||||
ErrorLog: HandlerFunc(methodErrorLog),
|
||||
HttpGet: HandlerFunc(methodHttpGet),
|
||||
HttpGetScheduled: HandlerFunc(methodHttpGetScheduled),
|
||||
|
@ -352,14 +354,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
|||
PreviousMessage: &thisMsg,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(newMsg)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er, logError)
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- newMsg
|
||||
}
|
||||
|
||||
// selectFileNaming will figure out the correct naming of the file
|
||||
|
|
|
@ -55,7 +55,10 @@ func methodCliCommand(proc process, message Message, node string) ([]byte, error
|
|||
// data payload there.
|
||||
var foundEnvData bool
|
||||
var envData string
|
||||
//var foundEnvFile bool
|
||||
//var envFile string
|
||||
for i, v := range message.MethodArgs {
|
||||
// Check if to use the content of the data field are specified.
|
||||
if strings.Contains(v, "{{CTRL_DATA}}") {
|
||||
foundEnvData = true
|
||||
// Replace the found env variable placeholder with an actual env variable
|
||||
|
|
101
requests_copy.go
101
requests_copy.go
|
@ -86,26 +86,12 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
// we should forward the message to that specified toNode. This will allow
|
||||
// us to initiate a file copy from another node to this node.
|
||||
if message.ToNode != proc.node {
|
||||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- message
|
||||
|
||||
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
|
||||
}
|
||||
|
||||
// Check if the filepaths for the socket a realpaths.
|
||||
file := filepath.Join(message.Directory, message.FileName)
|
||||
if strings.HasPrefix(file, "./") || !strings.HasPrefix(file, "/") {
|
||||
er := fmt.Errorf("error: copySrcSubHandler: path in message started with ./ or no directory at all, only full paths are allowed, path given was : %v", file)
|
||||
proc.errorKernel.errSend(proc, message, er, logError)
|
||||
newReplyMessage(proc, message, []byte(er.Error()))
|
||||
return nil, er
|
||||
}
|
||||
|
||||
var subProcessName string
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
|
@ -225,7 +211,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
|
||||
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
|
||||
copySrcSubProc := newSubProcess(ctx, proc.server, sub)
|
||||
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
|
@ -235,7 +221,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
copySrcSubProc.handler = copySrcSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copySrcSubProc.spawnWorker()
|
||||
go copySrcSubProc.start()
|
||||
|
||||
// Send a message over the the node where the destination file will be written,
|
||||
// to also start up a sub process on the destination node.
|
||||
|
@ -260,14 +246,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
// msg.Directory = dstDir
|
||||
// msg.FileName = dstFile
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
cancel()
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
|
||||
|
||||
|
@ -280,8 +259,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
|||
}
|
||||
|
||||
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
|
||||
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
|
||||
p := newProcess(ctx, server, subject, processKind)
|
||||
func newSubProcess(ctx context.Context, server *server, subject Subject) process {
|
||||
p := newProcess(ctx, server, subject)
|
||||
p.isSubProcess = true
|
||||
|
||||
return p
|
||||
|
@ -333,7 +312,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
|||
// previous message is then fully up and running, so we just discard
|
||||
// that second message in those cases.
|
||||
|
||||
pn := processNameGet(sub.name(), processKindSubscriber)
|
||||
pn := processNameGet(sub.name())
|
||||
// fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn)
|
||||
|
||||
proc.processes.active.mu.Lock()
|
||||
|
@ -352,7 +331,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
|||
}
|
||||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
|
||||
copyDstSubProc := newSubProcess(ctx, proc.server, sub)
|
||||
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
|
@ -362,7 +341,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
|||
copyDstSubProc.handler = copyDstSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copyDstSubProc.spawnWorker()
|
||||
go copyDstSubProc.start()
|
||||
|
||||
fp := filepath.Join(cia.DstDir, cia.DstFile)
|
||||
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName)
|
||||
|
@ -445,8 +424,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
// We don't care about the error.
|
||||
fi, err := os.Stat(file)
|
||||
if err != nil {
|
||||
fmt.Printf(" ** DEBUG: STAT ERROR: %v\n", err)
|
||||
fmt.Printf(" ** DEBUG: fi: %#v\n", fi)
|
||||
er := fmt.Errorf("DEBUG: ERROR while os.Stat(file): copySrcProcFunc, fileInfo: %v, err: %v", fi, err)
|
||||
proc.errorKernel.errSend(proc, Message{}, er, logWarning)
|
||||
|
||||
}
|
||||
|
||||
// We want to be able to send the reply message when the copying is done,
|
||||
|
@ -558,15 +538,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
ReplyRetries: initialMessage.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
resendRetries = 0
|
||||
|
||||
|
@ -628,15 +600,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
ReplyRetries: initialMessage.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
resendRetries++
|
||||
|
||||
|
@ -692,14 +656,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
ReplyRetries: message.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
}
|
||||
|
||||
// Open a tmp folder for where to write the received chunks
|
||||
|
@ -794,14 +751,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
ReplyRetries: message.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
case copyResendLast:
|
||||
// The csa already contains copyStatus copyResendLast when reached here,
|
||||
|
@ -827,14 +777,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
ReplyRetries: message.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
case copySrcDone:
|
||||
err := func() error {
|
||||
|
@ -847,7 +790,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
proc.errorKernel.logDebug(er)
|
||||
|
||||
if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) {
|
||||
// TODO: Add option to set permission here ???
|
||||
err := os.MkdirAll(cia.DstDir, fs.FileMode(cia.FolderPermission))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error: failed to create destination directory for file copying %v: %v", cia.DstDir, err)
|
||||
|
@ -981,14 +923,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
ReplyRetries: message.ReplyRetries,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
|
|
@ -17,8 +17,6 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
|
|||
fileName, folderTree := selectFileNaming(message, proc)
|
||||
file := filepath.Join(folderTree, fileName)
|
||||
|
||||
fmt.Printf("******************* DEBUG: CHECK IF SOCKET OR FILE: %v\n", file)
|
||||
|
||||
// log.Fatalf("EXITING\n")
|
||||
|
||||
// Check the file is a unix socket, and if it is we write the
|
||||
|
@ -70,8 +68,8 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
|
|||
// Open file and write data.
|
||||
f, err := os.OpenFile(file, fileFlag, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodToFile/Append: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
|
||||
er := fmt.Errorf("error: methodToFile/Append: failed to open file %v, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", file, message.Directory, message.FileName, err)
|
||||
fmt.Printf("%v\n", er)
|
||||
return er
|
||||
}
|
||||
defer f.Close()
|
||||
|
|
|
@ -125,10 +125,6 @@ func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte
|
|||
func methodKeysDeliverUpdate(proc process, message Message, node string) ([]byte, error) {
|
||||
// Get a context with the timeout specified in message.MethodTimeout.
|
||||
|
||||
// TODO:
|
||||
// - Since this is implemented as a NACK message we could implement a
|
||||
// metric thats shows the last time keys were updated.
|
||||
|
||||
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
|
@ -327,14 +323,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
ACKTimeout: 0,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||
proc.errorKernel.logDebug(er)
|
||||
|
@ -374,14 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
ACKTimeout: 0,
|
||||
}
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.newMessagesCh <- sam
|
||||
proc.newMessagesCh <- msg
|
||||
|
||||
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
|
||||
proc.errorKernel.logDebug(er)
|
||||
|
|
|
@ -23,7 +23,7 @@ func methodOpProcessList(proc process, message Message, node string) ([]byte, er
|
|||
|
||||
proc.processes.active.mu.Lock()
|
||||
for _, pTmp := range proc.processes.active.procNames {
|
||||
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
|
||||
s := fmt.Sprintf("%v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processID, pTmp.subject.name())
|
||||
sb := []byte(s)
|
||||
out = append(out, sb...)
|
||||
|
||||
|
@ -68,8 +68,8 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
|
|||
|
||||
// Create the process and start it.
|
||||
sub := newSubject(method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
|
||||
go procNew.spawnWorker()
|
||||
procNew := newProcess(proc.ctx, proc.server, sub)
|
||||
go procNew.start()
|
||||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf("%v", txt)
|
||||
|
@ -115,7 +115,6 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er
|
|||
|
||||
methodString := message.MethodArgs[0]
|
||||
node := message.MethodArgs[1]
|
||||
kind := message.MethodArgs[2]
|
||||
|
||||
method := Method(methodString)
|
||||
tmpH := mt.getHandler(Method(method))
|
||||
|
@ -132,7 +131,7 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er
|
|||
// We can then use this processName to get the real values for the
|
||||
// actual process we want to stop.
|
||||
sub := newSubject(method, string(node))
|
||||
processName := processNameGet(sub.name(), processKind(kind))
|
||||
processName := processNameGet(sub.name())
|
||||
|
||||
// Remove the process from the processes active map if found.
|
||||
proc.processes.active.mu.Lock()
|
||||
|
|
|
@ -93,8 +93,9 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co
|
|||
func newNatsServerForTesting(port int) *natsserver.Server {
|
||||
// Start up the nats-server message broker.
|
||||
nsOpt := &natsserver.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
JetStream: true,
|
||||
}
|
||||
|
||||
ns, err := natsserver.NewServer(nsOpt)
|
||||
|
@ -284,12 +285,7 @@ func TestRequest(t *testing.T) {
|
|||
for _, tt := range tests {
|
||||
switch tt.viaSocketOrCh {
|
||||
case viaCh:
|
||||
sam, err := newSubjectAndMessage(tt.message)
|
||||
if err != nil {
|
||||
t.Fatalf("newSubjectAndMessage failed: %v\n", err)
|
||||
}
|
||||
|
||||
tstSrv.newMessagesCh <- sam
|
||||
tstSrv.newMessagesCh <- tt.message
|
||||
|
||||
case viaSocket:
|
||||
msgs := []Message{tt.message}
|
||||
|
@ -338,6 +334,7 @@ func TestRequest(t *testing.T) {
|
|||
checkREQTailFileTest(tstConf, t, tstTempDir)
|
||||
checkMetricValuesTest(tstSrv, t)
|
||||
checkErrorKernelMalformedJSONtest(tstConf, t)
|
||||
t.Log("*******starting with checkREQCopySrc\n")
|
||||
checkREQCopySrc(tstConf, t, tstTempDir)
|
||||
}
|
||||
|
||||
|
@ -449,7 +446,8 @@ func checkREQCopySrc(conf *Configuration, t *testing.T, tmpDir string) error {
|
|||
"methodArgs": ["` + srcfp + `","central","` + dstfp + `","20","10"],
|
||||
"ACKTimeout":5,
|
||||
"retries":3,
|
||||
"methodTimeout": 10
|
||||
"methodTimeout": 10,
|
||||
"fileName": "filecopy2.log"
|
||||
}
|
||||
]`
|
||||
|
||||
|
|
301
server.go
301
server.go
|
@ -5,15 +5,20 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/jinzhu/copier"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
@ -21,9 +26,8 @@ import (
|
|||
type processName string
|
||||
|
||||
// Will return a process name made up of subjectName+processKind
|
||||
func processNameGet(sn subjectName, pk processKind) processName {
|
||||
pn := fmt.Sprintf("%s_%s", sn, pk)
|
||||
return processName(pn)
|
||||
func processNameGet(sn subjectName) processName {
|
||||
return processName(sn)
|
||||
}
|
||||
|
||||
// server is the structure that will hold the state about spawned
|
||||
|
@ -48,9 +52,9 @@ type server struct {
|
|||
//
|
||||
// In general the ringbuffer will read this
|
||||
// channel, unfold each slice, and put single messages on the buffer.
|
||||
newMessagesCh chan subjectAndMessage
|
||||
// directSAMSCh
|
||||
samSendLocalCh chan []subjectAndMessage
|
||||
newMessagesCh chan Message
|
||||
// messageDeliverLocalCh
|
||||
messageDeliverLocalCh chan []Message
|
||||
// Channel for messages to publish with Jetstream.
|
||||
jetstreamPublishCh chan Message
|
||||
// errorKernel is doing all the error handling like what to do if
|
||||
|
@ -74,7 +78,8 @@ type server struct {
|
|||
// message ID
|
||||
messageID messageID
|
||||
// audit logging
|
||||
auditLogCh chan []subjectAndMessage
|
||||
auditLogCh chan []Message
|
||||
zstdEncoder *zstd.Encoder
|
||||
}
|
||||
|
||||
type messageID struct {
|
||||
|
@ -114,7 +119,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
// return nil, fmt.Errorf("error: failed to create tmp seed file: %v", err)
|
||||
// }
|
||||
|
||||
err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0700)
|
||||
err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0600)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to write temp seed file: %v", err)
|
||||
|
@ -126,13 +131,14 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
return nil, fmt.Errorf("error: failed to read temp nkey seed file: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = os.Remove(pth)
|
||||
if err != nil {
|
||||
cancel()
|
||||
log.Fatalf("error: failed to remove temp seed file: %v\n", err)
|
||||
}
|
||||
}()
|
||||
// // TODO: REMOVED for testing
|
||||
//defer func() {
|
||||
// err = os.Remove(pth)
|
||||
// if err != nil {
|
||||
// cancel()
|
||||
// log.Fatalf("error: failed to remove temp seed file: %v\n", err)
|
||||
// }
|
||||
//}()
|
||||
|
||||
case configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "":
|
||||
var err error
|
||||
|
@ -212,23 +218,36 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
centralAuth := newCentralAuth(configuration, errorKernel)
|
||||
//}
|
||||
|
||||
zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||
if err != nil {
|
||||
log.Fatalf("error: zstd new encoder failed: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
zstdEncoder.Close()
|
||||
}()
|
||||
}()
|
||||
|
||||
s := server{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
configuration: configuration,
|
||||
nodeName: configuration.NodeName,
|
||||
natsConn: conn,
|
||||
ctrlSocket: ctrlSocket,
|
||||
newMessagesCh: make(chan subjectAndMessage),
|
||||
samSendLocalCh: make(chan []subjectAndMessage),
|
||||
jetstreamPublishCh: make(chan Message),
|
||||
metrics: metrics,
|
||||
version: version,
|
||||
errorKernel: errorKernel,
|
||||
nodeAuth: nodeAuth,
|
||||
helloRegister: newHelloRegister(),
|
||||
centralAuth: centralAuth,
|
||||
auditLogCh: make(chan []subjectAndMessage),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
configuration: configuration,
|
||||
nodeName: configuration.NodeName,
|
||||
natsConn: conn,
|
||||
ctrlSocket: ctrlSocket,
|
||||
newMessagesCh: make(chan Message),
|
||||
messageDeliverLocalCh: make(chan []Message),
|
||||
jetstreamPublishCh: make(chan Message),
|
||||
metrics: metrics,
|
||||
version: version,
|
||||
errorKernel: errorKernel,
|
||||
nodeAuth: nodeAuth,
|
||||
helloRegister: newHelloRegister(),
|
||||
centralAuth: centralAuth,
|
||||
auditLogCh: make(chan []Message),
|
||||
zstdEncoder: zstdEncoder,
|
||||
}
|
||||
|
||||
s.processes = newProcesses(ctx, &s)
|
||||
|
@ -340,7 +359,7 @@ func (s *server) Start() {
|
|||
//
|
||||
// The context of the initial process are set in processes.Start.
|
||||
sub := newSubject(Initial, s.nodeName)
|
||||
s.processInitial = newProcess(context.TODO(), s, sub, "")
|
||||
s.processInitial = newProcess(context.TODO(), s, sub)
|
||||
// Start all wanted subscriber processes.
|
||||
s.processes.Start(s.processInitial)
|
||||
|
||||
|
@ -358,7 +377,7 @@ func (s *server) Start() {
|
|||
}
|
||||
|
||||
// Start the processing of new messages from an input channel.
|
||||
s.routeMessagesToProcess()
|
||||
s.routeMessagesToPublisherProcess()
|
||||
|
||||
// Start reading the channel for injecting direct messages that should
|
||||
// not be sent via the message broker.
|
||||
|
@ -381,11 +400,11 @@ func (s *server) startAuditLog(ctx context.Context) {
|
|||
|
||||
for {
|
||||
select {
|
||||
case sams := <-s.auditLogCh:
|
||||
case messages := <-s.auditLogCh:
|
||||
|
||||
for _, sam := range sams {
|
||||
for _, message := range messages {
|
||||
msgForPermStore := Message{}
|
||||
copier.Copy(&msgForPermStore, sam.Message)
|
||||
copier.Copy(&msgForPermStore, message)
|
||||
// Remove the content of the data field.
|
||||
msgForPermStore.Data = nil
|
||||
|
||||
|
@ -417,27 +436,29 @@ func (s *server) directSAMSChRead() {
|
|||
case <-s.ctx.Done():
|
||||
log.Printf("info: stopped the directSAMSCh reader\n\n")
|
||||
return
|
||||
case sams := <-s.samSendLocalCh:
|
||||
case messages := <-s.messageDeliverLocalCh:
|
||||
// fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams)
|
||||
// Range over all the sams, find the process, check if the method exists, and
|
||||
// handle the message by starting the correct method handler.
|
||||
for i := range sams {
|
||||
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
|
||||
for i := range messages {
|
||||
// TODO: !!!!!! Shoud the node here be the fromNode ???????
|
||||
subject := newSubject(messages[i].Method, string(messages[i].ToNode))
|
||||
processName := processNameGet(subject.name())
|
||||
|
||||
s.processes.active.mu.Lock()
|
||||
p := s.processes.active.procNames[processName]
|
||||
s.processes.active.mu.Unlock()
|
||||
|
||||
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
|
||||
mh, ok := p.methodsAvailable.CheckIfExists(messages[i].Method)
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Method)
|
||||
p.errorKernel.errSend(p, sams[i].Message, er, logError)
|
||||
p.errorKernel.errSend(p, messages[i], er, logError)
|
||||
continue
|
||||
}
|
||||
|
||||
p.handler = mh
|
||||
|
||||
go executeHandler(p, sams[i].Message, s.nodeName)
|
||||
go executeHandler(p, messages[i], s.nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -471,7 +492,7 @@ func (s *server) Stop() {
|
|||
|
||||
}
|
||||
|
||||
// routeMessagesToProcess takes a database name it's input argument.
|
||||
// routeMessagesToPublisherProcess takes a database name it's input argument.
|
||||
// The database will be used as the persistent k/v store for the work
|
||||
// queue which is implemented as a ring buffer.
|
||||
// The ringBufferInCh are where we get new messages to publish.
|
||||
|
@ -480,7 +501,7 @@ func (s *server) Stop() {
|
|||
// worker process.
|
||||
// It will also handle the process of spawning more worker processes
|
||||
// for publisher subjects if it does not exist.
|
||||
func (s *server) routeMessagesToProcess() {
|
||||
func (s *server) routeMessagesToPublisherProcess() {
|
||||
// Start reading new fresh messages received on the incomming message
|
||||
// pipe/file.
|
||||
|
||||
|
@ -492,117 +513,85 @@ func (s *server) routeMessagesToProcess() {
|
|||
methodsAvailable := method.GetMethodsAvailable()
|
||||
|
||||
go func() {
|
||||
for sam := range s.newMessagesCh {
|
||||
for message := range s.newMessagesCh {
|
||||
|
||||
go func(sam subjectAndMessage) {
|
||||
// TODO: Jetstream
|
||||
// Check if Jetstream stream are specified,
|
||||
// and send off to Jetstream publisher.
|
||||
go func(message Message) {
|
||||
|
||||
s.messageID.mu.Lock()
|
||||
s.messageID.id++
|
||||
sam.Message.ID = s.messageID.id
|
||||
message.ID = s.messageID.id
|
||||
s.messageID.mu.Unlock()
|
||||
|
||||
s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID))
|
||||
s.metrics.promMessagesProcessedIDLast.Set(float64(message.ID))
|
||||
|
||||
// Check if the format of the message is correct.
|
||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||
s.errorKernel.errSend(s.processInitial, sam.Message, er, logError)
|
||||
if _, ok := methodsAvailable.CheckIfExists(message.Method); !ok {
|
||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", message.Method)
|
||||
s.errorKernel.errSend(s.processInitial, message, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case sam.Message.Retries < 0:
|
||||
sam.Message.Retries = s.configuration.DefaultMessageRetries
|
||||
case message.Retries < 0:
|
||||
message.Retries = s.configuration.DefaultMessageRetries
|
||||
}
|
||||
if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 {
|
||||
sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout
|
||||
if message.MethodTimeout < 1 && message.MethodTimeout != -1 {
|
||||
message.MethodTimeout = s.configuration.DefaultMethodTimeout
|
||||
}
|
||||
|
||||
// ---
|
||||
// Check for {{CTRL_FILE}} and if we should read and load a local file into
|
||||
// the message before sending.
|
||||
|
||||
m := sam.Message
|
||||
var filePathToOpen string
|
||||
foundFile := false
|
||||
var argPos int
|
||||
for i, v := range message.MethodArgs {
|
||||
if strings.Contains(v, "{{CTRL_FILE:") {
|
||||
foundFile = true
|
||||
argPos = i
|
||||
|
||||
subjName := sam.Subject.name()
|
||||
pn := processNameGet(subjName, processKindPublisher)
|
||||
// Example to split:
|
||||
// echo {{CTRL_FILE:/somedir/msg_file.yaml}}>ctrlfile.txt
|
||||
//
|
||||
// Split at colon. We want the part after.
|
||||
ss := strings.Split(v, ":")
|
||||
// Split at "}}",so pos [0] in the result contains just the file path.
|
||||
sss := strings.Split(ss[1], "}}")
|
||||
filePathToOpen = sss[0]
|
||||
|
||||
sendOK := func() bool {
|
||||
var ctxCanceled bool
|
||||
|
||||
s.processes.active.mu.Lock()
|
||||
defer s.processes.active.mu.Unlock()
|
||||
|
||||
// Check if the process exist, if it do not exist return false so a
|
||||
// new publisher process will be created.
|
||||
proc, ok := s.processes.active.procNames[pn]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if proc.ctx.Err() != nil {
|
||||
ctxCanceled = true
|
||||
}
|
||||
if ok && ctxCanceled {
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
|
||||
s.errorKernel.logDebug(er)
|
||||
delete(proc.processes.active.procNames, proc.processName)
|
||||
return false
|
||||
}
|
||||
|
||||
// If found in map above, and go routine for publishing is running,
|
||||
// put the message on that processes incomming message channel.
|
||||
if ok && !ctxCanceled {
|
||||
select {
|
||||
case proc.subject.messageCh <- m:
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
|
||||
s.errorKernel.logDebug(er)
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
|
||||
s.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// The process was not found, so we return false here so a new publisher
|
||||
// process will be created later.
|
||||
return false
|
||||
}()
|
||||
|
||||
if sendOK {
|
||||
return
|
||||
}
|
||||
|
||||
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
|
||||
s.errorKernel.logDebug(er)
|
||||
if foundFile {
|
||||
|
||||
fh, err := os.Open(filePathToOpen)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: routeMessagesToPublisherProcess: failed to open file given as CTRL_FILE argument: %v", err)
|
||||
s.errorKernel.logError(er)
|
||||
return
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
b, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: routeMessagesToPublisherProcess: failed to read file %v given as CTRL_FILE argument: %v", filePathToOpen, err)
|
||||
s.errorKernel.logError(er)
|
||||
return
|
||||
}
|
||||
|
||||
// Replace the {{CTRL_FILE}} with the actual content read from file.
|
||||
re := regexp.MustCompile(`(.*)({{CTRL_FILE.*}})(.*)`)
|
||||
message.MethodArgs[argPos] = re.ReplaceAllString(message.MethodArgs[argPos], `${1}`+string(b)+`${3}`)
|
||||
// ---
|
||||
|
||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||
var proc process
|
||||
switch {
|
||||
case m.IsSubPublishedMsg:
|
||||
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
|
||||
default:
|
||||
proc = newProcess(s.ctx, s, sub, processKindPublisher)
|
||||
}
|
||||
|
||||
proc.spawnWorker()
|
||||
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
||||
s.errorKernel.logDebug(er)
|
||||
message.ArgSignature = s.processInitial.addMethodArgSignature(message)
|
||||
|
||||
// Now when the process is spawned we continue,
|
||||
// and send the message to that new process.
|
||||
select {
|
||||
case proc.subject.messageCh <- m:
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
|
||||
s.errorKernel.logDebug(er)
|
||||
case <-proc.ctx.Done():
|
||||
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
|
||||
s.errorKernel.logDebug(er)
|
||||
}
|
||||
go s.processInitial.publishAMessage(message, s.natsConn)
|
||||
|
||||
}(sam)
|
||||
}(message)
|
||||
|
||||
}
|
||||
}()
|
||||
|
@ -632,3 +621,59 @@ func (s *server) exposeDataFolder() {
|
|||
os.Exit(1)
|
||||
|
||||
}
|
||||
|
||||
// messageSerializeAndCompress will serialize and compress the Message, and
|
||||
// return the result as a []byte.
|
||||
func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) {
|
||||
|
||||
// encode the message structure into cbor
|
||||
bSerialized, err := cbor.Marshal(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||
s.errorKernel.logDebug(er)
|
||||
return nil, er
|
||||
}
|
||||
|
||||
// Compress the data payload if selected with configuration flag.
|
||||
// The compression chosen is later set in the nats msg header when
|
||||
// calling p.messageDeliverNats below.
|
||||
|
||||
bCompressed := s.zstdEncoder.EncodeAll(bSerialized, nil)
|
||||
|
||||
return bCompressed, nil
|
||||
}
|
||||
|
||||
// messageDeserializeAndUncompress will deserialize the ctrl message
|
||||
func (s *server) messageDeserializeAndUncompress(msgData []byte) (Message, error) {
|
||||
|
||||
// // If debugging is enabled, print the source node name of the nats messages received.
|
||||
// headerFromNode := msg.Headers().Get("fromNode")
|
||||
// if headerFromNode != "" {
|
||||
// er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
|
||||
// s.errorKernel.logDebug(er)
|
||||
// }
|
||||
|
||||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err)
|
||||
return Message{}, er
|
||||
}
|
||||
msgData, err = zr.DecodeAll(msgData, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err)
|
||||
zr.Close()
|
||||
return Message{}, er
|
||||
}
|
||||
|
||||
zr.Close()
|
||||
|
||||
message := Message{}
|
||||
|
||||
err = cbor.Unmarshal(msgData, &message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, error: %v", err)
|
||||
return Message{}, er
|
||||
}
|
||||
|
||||
return message, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue