diff --git a/TODO.md b/TODO.md index 49b26d3..07111f8 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,3 @@ -TODO: +# TODO -- Create {{ file: }} to be used within methodArguments -- Add option to send request types with Jetstream. \ No newline at end of file +## Key and ACL updates to use jetstream diff --git a/cmd/ctrl/main.go b/cmd/ctrl/main.go index a2d9153..a0a9ba2 100644 --- a/cmd/ctrl/main.go +++ b/cmd/ctrl/main.go @@ -21,20 +21,25 @@ import ( var version string func main() { - defer profile.Start(profile.BlockProfile).Stop() - //defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() - //defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() - //defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop() c := ctrl.NewConfiguration() - // err := c.CheckFlags(version) - // if err != nil { - // log.Printf("%v\n", err) - // return - // } // Start profiling if profiling port is specified if c.ProfilingPort != "" { + + switch c.Profiling { + case "block": + defer profile.Start(profile.BlockProfile).Stop() + case "cpu": + defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() + case "trace": + defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() + case "mem": + defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop() + default: + log.Fatalf("error: profiling port defined, but no valid profiling type defined. Check --help. Got: %v\n", c.Profiling) + } + go func() { http.ListenAndServe("localhost:"+c.ProfilingPort, nil) }() diff --git a/configuration_flags.go b/configuration_flags.go index 92e03d5..ef0e27e 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -46,12 +46,16 @@ type Configuration struct { KeysUpdateInterval int `comment:"KeysUpdateInterval in seconds"` // AclUpdateInterval in seconds AclUpdateInterval int `comment:"AclUpdateInterval in seconds"` + // The type of profiling + Profiling string // The number of the profiling port ProfilingPort string `comment:"The number of the profiling port"` // Host and port for prometheus listener, e.g. localhost:2112 PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"` // Comma separated list of additional streams to consume from. - JetstreamsConsume string + JetstreamsConsume string `comment:"a comma separated list of other jetstream subjects to consume"` + // Jetstream MaxMsgsPerSubject + JetStreamMaxMsgsPerSubject int `comment:"max messages to keep on the broker for a jetstream subject"` // Set to true if this is the node that should receive the error log's from other nodes DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"` // Default value for how long can a request method max be allowed to run in seconds @@ -78,10 +82,6 @@ type Configuration struct { ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"` // Retries for error messages ErrorMessageRetries int `comment:"Retries for error messages"` - // Compression z for zstd or g for gzip - Compression string `comment:"Compression z for zstd or g for gzip"` - // Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor - Serialization string `comment:"Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor"` // SetBlockProfileRate for block profiling SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"` // EnableSocket for enabling the creation of a ctrl.sock file @@ -165,9 +165,11 @@ func NewConfiguration() *Configuration { flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", CheckEnv("NATS_RECONNECT_JITTER_TLS", c.NatsReconnectJitterTLS).(int), "default nats ReconnectJitterTLS interval in seconds.") flag.IntVar(&c.KeysUpdateInterval, "keysUpdateInterval", CheckEnv("KEYS_UPDATE_INTERVAL", c.KeysUpdateInterval).(int), "default interval in seconds for asking the central for public keys") flag.IntVar(&c.AclUpdateInterval, "aclUpdateInterval", CheckEnv("ACL_UPDATE_INTERVAL", c.AclUpdateInterval).(int), "default interval in seconds for asking the central for acl updates") + flag.StringVar(&c.Profiling, "profiling", CheckEnv("PROFILING", c.Profiling).(string), "type of profiling: cpu/block/trace/mem/heap") flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112") flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from") + flag.IntVar(&c.JetStreamMaxMsgsPerSubject, "jetstreamMaxMsgsPerSubject", CheckEnv("JETSTREAM_MAX_MSGS_PER_SUBJECT", c.JetStreamMaxMsgsPerSubject).(int), "max messages to keep on the broker per jetstream subject") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run") @@ -180,8 +182,6 @@ func NewConfiguration() *Configuration { flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it") - flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression") - flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file") flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.") @@ -228,46 +228,46 @@ func NewConfiguration() *Configuration { // Get a Configuration struct with the default values set. func newConfigurationDefaults() Configuration { c := Configuration{ - ConfigFolder: "./etc/", - SocketFolder: "./tmp", - ReadFolder: "./readfolder", - EnableReadFolder: true, - TCPListener: "", - HTTPListener: "", - DatabaseFolder: "./var/lib", - NodeName: "", - BrokerAddress: "127.0.0.1:4222", - NatsConnOptTimeout: 20, - NatsConnectRetryInterval: 10, - NatsReconnectJitter: 100, - NatsReconnectJitterTLS: 1, - KeysUpdateInterval: 60, - AclUpdateInterval: 60, - ProfilingPort: "", - PromHostAndPort: "", - JetstreamsConsume: "", - DefaultMessageTimeout: 10, - DefaultMessageRetries: 1, - DefaultMethodTimeout: 10, - SubscribersDataFolder: "./data", - CentralNodeName: "central", - RootCAPath: "", - NkeySeedFile: "", - NkeyFromED25519SSHKeyFile: "", - NkeySeed: "", - ExposeDataFolder: "", - ErrorMessageTimeout: 60, - ErrorMessageRetries: 10, - Compression: "z", - Serialization: "cbor", - SetBlockProfileRate: 0, - EnableSocket: true, - EnableSignatureCheck: false, - EnableAclCheck: false, - EnableDebug: false, - LogLevel: "debug", - LogConsoleTimestamps: false, - KeepPublishersAliveFor: 10, + ConfigFolder: "./etc/", + SocketFolder: "./tmp", + ReadFolder: "./readfolder", + EnableReadFolder: true, + TCPListener: "", + HTTPListener: "", + DatabaseFolder: "./var/lib", + NodeName: "", + BrokerAddress: "127.0.0.1:4222", + NatsConnOptTimeout: 20, + NatsConnectRetryInterval: 10, + NatsReconnectJitter: 100, + NatsReconnectJitterTLS: 1, + KeysUpdateInterval: 60, + AclUpdateInterval: 60, + Profiling: "", + ProfilingPort: "", + PromHostAndPort: "", + JetstreamsConsume: "", + JetStreamMaxMsgsPerSubject: 100, + DefaultMessageTimeout: 10, + DefaultMessageRetries: 1, + DefaultMethodTimeout: 10, + SubscribersDataFolder: "./data", + CentralNodeName: "central", + RootCAPath: "", + NkeySeedFile: "", + NkeyFromED25519SSHKeyFile: "", + NkeySeed: "", + ExposeDataFolder: "", + ErrorMessageTimeout: 60, + ErrorMessageRetries: 10, + SetBlockProfileRate: 0, + EnableSocket: true, + EnableSignatureCheck: false, + EnableAclCheck: false, + EnableDebug: false, + LogLevel: "debug", + LogConsoleTimestamps: false, + KeepPublishersAliveFor: 10, StartProcesses: StartProcesses{ StartPubHello: 30, diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index dd0b383..61c1c3c 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -15,7 +15,8 @@ - [Request Methods](./core_request_methods.md) - [Nats timeouts](./core_nats_timeouts.md) - [Startup folder](./core_startup_folder.md) -- [{{ctrl_DATA}} variable](./core_messaging_ctrl_DATA.md) +- [{{CTRL_DATA}} variable](./core_messaging_CTRL_DATA.md) +- [{{CTRL_FILE}} variable](./core_messaging_CTRL_FILE.md) - [Errors](./core_errors.md) # Example standard messages diff --git a/doc/src/core_messaging_CTRL_FILE.md b/doc/src/core_messaging_CTRL_FILE.md new file mode 100644 index 0000000..96c9c37 --- /dev/null +++ b/doc/src/core_messaging_CTRL_FILE.md @@ -0,0 +1,23 @@ +# Message methodArgs variables + +## {{CTRL_FILE}} + +Read a local text file, and embed the content of the file into the methodArgs. + +```yaml +--- +- toNodes: + - btdev1 + #jetstreamToNode: btdev1 + method: cliCommand + methodArgs: + - /bin/bash + - -c + - | + echo {{CTRL_FILE:/some_directory/source_file.yaml}}>/other_directory/destination_file.yaml + methodTimeout: 3 + replyMethod: console + ACKTimeout: 0 +``` + +The above example will before sending the message read the content of the file `/some_directory/source_file.yaml`. When the message is received at it's destination node and the cliCommand is executed the content will be written to `/other_directory/destination_file.yaml`. diff --git a/doc/src/core_messaging_methodargs_variables.md b/doc/src/core_messaging_methodargs_variables.md new file mode 100644 index 0000000..3356a63 --- /dev/null +++ b/doc/src/core_messaging_methodargs_variables.md @@ -0,0 +1 @@ +# Message methodArgs variables diff --git a/errorkernel.go b/errorkernel.go index 110e709..9fbdc6d 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -68,7 +68,7 @@ const logNone logLevel = "none" // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. -func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error { +func (e *errorKernel) start(ringBufferBulkInCh chan<- Message) error { // Initiate the slog logger. var replaceFunc func(groups []string, a slog.Attr) slog.Attr if !e.configuration.LogConsoleTimestamps { @@ -131,13 +131,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error { Retries: errEvent.process.configuration.ErrorMessageRetries, } - sam := subjectAndMessage{ - Subject: newSubject(ErrorLog, "errorCentral"), - Message: m, - } - // Put the message on the channel to the ringbuffer. - ringBufferBulkInCh <- sam + ringBufferBulkInCh <- m // if errEvent.process.configuration.EnableDebug { // log.Printf("%v\n", er) diff --git a/message_and_subject.go b/message_and_subject.go index c28073a..9949a2b 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -101,11 +101,6 @@ type Subject struct { ToNode string `json:"node" yaml:"toNode"` // method, what is this message doing, etc. CLICommand, Syslog, etc. Method Method `json:"method" yaml:"method"` - // messageCh is used by publisher kind processes to read new messages - // to be published. The content on this channel have been routed here - // from routeMessagesToPublish in *server. - // This channel is only used for publishing processes. - messageCh chan Message } // newSubject will return a new variable of the type subject, and insert @@ -124,9 +119,8 @@ func newSubject(method Method, node string) Subject { } return Subject{ - ToNode: node, - Method: method, - messageCh: make(chan Message), + ToNode: node, + Method: method, } } @@ -139,9 +133,8 @@ func newSubjectNoVerifyHandler(method Method, node string) Subject { // Get the Event type for the Method. return Subject{ - ToNode: node, - Method: method, - messageCh: make(chan Message), + ToNode: node, + Method: method, } } diff --git a/message_readers.go b/message_readers.go index 65c257f..c50c557 100644 --- a/message_readers.go +++ b/message_readers.go @@ -77,7 +77,7 @@ func (s *server) readStartupFolder() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: startup folder: malformed json read: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) @@ -85,38 +85,35 @@ func (s *server) readStartupFolder() { } // Check if fromNode field is specified, and remove the message if blank. - for i := range sams { + for i := range messages { // We want to allow the use of nodeName local only in startup folder, and // if used we substite it for the local node name. - if sams[i].Message.ToNode == "local" { - sams[i].Message.ToNode = Node(s.nodeName) - sams[i].Subject.ToNode = s.nodeName + if messages[i].ToNode == "local" { + messages[i].ToNode = Node(s.nodeName) } switch { - case sams[i].Message.FromNode == "": - // Remove the first message from the slice. - sams = append(sams[:i], sams[i+1:]...) + case messages[i].FromNode == "": er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message") s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + continue - case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0: - // Remove the first message from the slice. - sams = append(sams[:i], sams[i+1:]...) + case messages[i].ToNode == "" && len(messages[i].ToNodes) == 0: er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message") s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + continue } } - j, err := json.MarshalIndent(sams, "", " ") + j, err := json.MarshalIndent(messages, "", " ") if err != nil { log.Printf("test error: %v\n", err) } er = fmt.Errorf("%v", string(j)) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) - s.samSendLocalCh <- sams + s.messageDeliverLocalCh <- messages } @@ -128,29 +125,28 @@ func (s *server) jetstreamPublish() { // Create a stream _, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{ - Name: "NODES", - Subjects: []string{"NODES.>"}, - // TODO: Create Flag ? - MaxMsgsPerSubject: 100, - // MaxMsgsPerSubject: 1, + Name: "NODES", + Subjects: []string{"NODES.>"}, + MaxMsgsPerSubject: int64(s.configuration.JetStreamMaxMsgsPerSubject), }) // Publish messages. for { select { - case jsMSG := <-s.jetstreamPublishCh: - b, err := json.Marshal(jsMSG) + case msg := <-s.jetstreamPublishCh: + + b, err := s.messageSerializeAndCompress(msg) if err != nil { log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err) } - subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode)) + subject := string(fmt.Sprintf("NODES.%v", msg.JetstreamToNode)) _, err = js.Publish(s.ctx, subject, b) if err != nil { log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err) } - fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG) + fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, msg) case <-s.ctx.Done(): } } @@ -183,7 +179,7 @@ func (s *server) jetstreamConsume() { } } - er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues) + er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %v", filterSubjectValues) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{ @@ -201,10 +197,9 @@ func (s *server) jetstreamConsume() { msg.Ack() - m := Message{} - err := json.Unmarshal(msg.Data(), &m) + m, err := s.messageDeserializeAndUncompress(msg.Data()) if err != nil { - er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err) + er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logError) return } @@ -214,15 +209,7 @@ func (s *server) jetstreamConsume() { // nodeName of the consumer in the ctrl Message, so we are sure it is handled locally. m.ToNode = Node(s.nodeName) - sam, err := newSubjectAndMessage(m) - if err != nil { - er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er, logError) - return - } - - // If a message is received via - s.samSendLocalCh <- []subjectAndMessage{sam} + s.messageDeliverLocalCh <- []Message{m} }) defer consumeContext.Stop() @@ -302,30 +289,30 @@ func (s *server) readSocket() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) + messages[i].FromNode = Node(s.nodeName) // Send an info message to the central about the message picked // for auditing. - er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) + er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, messages[i]) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) - s.newMessagesCh <- sams[i] + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages }(conn) } @@ -381,47 +368,45 @@ func (s *server) readFolder() { } fh.Close() - fmt.Printf("------- DEBUG: %v\n", b) - b = bytes.Trim(b, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(b) + messages, err := s.convertBytesToMessages(b) if err != nil { er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) + messages[i].FromNode = Node(s.nodeName) // Send an info message to the central about the message picked // for auditing. - er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message) + er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, messages[i]) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) // Check if it is a message to publish with Jetstream. - if sams[i].Message.JetstreamToNode != "" { + if messages[i].JetstreamToNode != "" { - s.jetstreamPublishCh <- sams[i].Message - er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams) + s.jetstreamPublishCh <- messages[i] + er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", messages) s.errorKernel.logDebug(er) continue } - s.newMessagesCh <- sams[i] + s.newMessagesCh <- messages[i] - er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) + er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", messages) s.errorKernel.logDebug(er) } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages // Delete the file. err = os.Remove(event.Name) @@ -498,23 +483,23 @@ func (s *server) readTCPListener() { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on tcp listener: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) - s.newMessagesCh <- sams[i] + messages[i].FromNode = Node(s.nodeName) + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages }(conn) } @@ -543,23 +528,23 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sams, err := s.convertBytesToSAMs(readBytes) + messages, err := s.convertBytesToMessages(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } - for i := range sams { + for i := range messages { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sams[i].Message.FromNode = Node(s.nodeName) - s.newMessagesCh <- sams[i] + messages[i].FromNode = Node(s.nodeName) + s.newMessagesCh <- messages[i] } // Send the SAM struct to be picked up by the ring buffer. - s.auditLogCh <- sams + s.auditLogCh <- messages } @@ -583,21 +568,11 @@ func (s *server) readHttpListener() { }() } -// The subject are made up of different parts of the message field. -// To make things easier and to avoid figuring out what the subject -// is in all places we've created the concept of subjectAndMessage -// (sam) where we get the subject for the message once, and use the -// sam structure with subject alongside the message instead. -type subjectAndMessage struct { - Subject `json:"subject" yaml:"subject"` - Message `json:"message" yaml:"message"` -} - // convertBytesToSAMs will range over the byte representing a message given in // json format. For each element found the Message type will be converted into // a SubjectAndMessage type value and appended to a slice, and the slice is // returned to the caller. -func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { +func (s *server) convertBytesToMessages(b []byte) ([]Message, error) { MsgSlice := []Message{} err := yaml.Unmarshal(b, &MsgSlice) @@ -609,22 +584,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { MsgSlice = s.checkMessageToNodes(MsgSlice) s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice))) - sam := []subjectAndMessage{} - - // Range over all the messages parsed from json, and create a subject for - // each message. - for _, m := range MsgSlice { - sm, err := newSubjectAndMessage(m) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage: %v", err) - s.errorKernel.errSend(s.processInitial, m, er, logWarning) - - continue - } - sam = append(sam, sm) - } - - return sam, nil + return MsgSlice, nil } // checkMessageToNodes will check that either toHost or toHosts are @@ -668,37 +628,3 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message { return msgs } - -// newSubjectAndMessage will look up the correct values and value types to -// be used in a subject for a Message (sam), and return the a combined structure -// of type subjectAndMessage. -func newSubjectAndMessage(m Message) (subjectAndMessage, error) { - // We need to create a tempory method type to look up the kind for the - // real method for the message. - var mt Method - - tmpH := mt.getHandler(m.Method) - if tmpH == nil { - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method) - } - - switch { - case m.ToNode == "": - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m) - case m.Method == "": - return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m) - } - - sub := Subject{ - ToNode: string(m.ToNode), - Method: m.Method, - messageCh: make(chan Message), - } - - sam := subjectAndMessage{ - Subject: sub, - Message: m, - } - - return sam, nil -} diff --git a/process.go b/process.go index 7dd04c9..03191f6 100644 --- a/process.go +++ b/process.go @@ -1,44 +1,23 @@ package ctrl import ( - "bytes" - "compress/gzip" "context" "crypto/ed25519" - "encoding/gob" "errors" "fmt" - "io" - "os" - "sync" + "log" "time" - "github.com/fxamacker/cbor/v2" - "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" // "google.golang.org/protobuf/internal/errors" ) -// processKind are either kindSubscriber or kindPublisher, and are -// used to distinguish the kind of process to spawn and to know -// the process kind put in the process map. -type processKind string - -const ( - processKindSubscriber processKind = "subscriber" - processKindPublisher processKind = "publisher" -) - // process holds all the logic to handle a message type and it's // method, subscription/publishin messages for a subject, and more. type process struct { // isSubProcess is used to indentify subprocesses spawned by other processes. isSubProcess bool - // isLongRunningPublisher is set to true for a publisher service that should not - // be auto terminated like a normal autospawned publisher would be when the the - // inactivity timeout have expired - isLongRunningPublisher bool // server server *server // messageID @@ -50,8 +29,7 @@ type process struct { // Put a node here to be able know the node a process is at. node Node // The processID for the current process - processID int - processKind processKind + processID int // methodsAvailable methodsAvailable MethodsAvailable // procFunc is a function that will be started when a worker process @@ -88,7 +66,7 @@ type process struct { // copy of the configuration from server configuration *Configuration // The new messages channel copied from *Server - newMessagesCh chan<- subjectAndMessage + newMessagesCh chan<- Message // The structure who holds all processes information processes *processes // nats connection @@ -104,7 +82,7 @@ type process struct { // handler is used to directly attach a handler to a process upon // creation of the process, like when a process is spawning a sub - // process like REQCopySrc do. If we're not spawning a sub process + // process like copySrc do. If we're not spawning a sub process // and it is a regular process the handler to use is found with the // getHandler method handler func(proc process, message Message, node string) ([]byte, error) @@ -124,7 +102,7 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { +func newProcess(ctx context.Context, server *server, subject Subject) process { // create the initial configuration for a sessions communicating with 1 host process. server.processes.mu.Lock() server.processes.lastProcessID++ @@ -141,7 +119,6 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin subject: subject, node: Node(server.configuration.NodeName), processID: pid, - processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), newMessagesCh: server.newMessagesCh, configuration: server.configuration, @@ -158,14 +135,8 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle - // one message queue. - - if proc.processKind == processKindPublisher { - proc.processName = processNameGet(proc.subject.name(), processKindPublisher) - } - if proc.processKind == processKindSubscriber { - proc.processName = processNameGet(proc.subject.name(), processKindSubscriber) - } + // one request type. + proc.processName = processNameGet(proc.subject.name()) return proc } @@ -177,24 +148,16 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // // It will give the process the next available ID, and also add the // process to the processes map in the server structure. -func (p process) spawnWorker() { +func (p process) start() { // Add prometheus metrics for the process. if !p.isSubProcess { p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)}) } - // Start a publisher worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. - if p.processKind == processKindPublisher { - p.startPublisher() - } - // Start a subscriber worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. - if p.processKind == processKindSubscriber { - p.startSubscriber() - } + // to handle executing the request method defined in the message. + p.startSubscriber() // Add information about the new process to the started processes map. p.processes.active.mu.Lock() @@ -205,27 +168,6 @@ func (p process) spawnWorker() { p.errorKernel.logDebug(er) } -func (p process) startPublisher() { - // If there is a procFunc for the process, start it. - if p.procFunc != nil { - // Initialize the channel for communication between the proc and - // the procFunc. - p.procFuncCh = make(chan Message) - - // Start the procFunc in it's own anonymous func so we are able - // to get the return error. - go func() { - err := p.procFunc(p.ctx, p.procFuncCh) - if err != nil { - er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logError) - } - }() - } - - go p.publishMessages(p.natsConn) -} - func (p process) startSubscriber() { // If there is a procFunc for the process, start it. if p.procFunc != nil { @@ -244,7 +186,7 @@ func (p process) startSubscriber() { }() } - p.natsSubscription = p.subscribeMessages() + p.natsSubscription = p.startNatsSubscriber() // We also need to be able to remove all the information about this process // when the process context is canceled. @@ -271,26 +213,29 @@ var ( ErrACKSubscribeRetry = errors.New("ctrl: retrying to subscribe for ack message") ) -// messageDeliverNats will create the Nats message with headers and payload. -// It will also take care of the delivering the message that is converted to -// gob or cbor format as a nats.Message. It will also take care of checking -// timeouts and retries specified for the message. -func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { +// publishNats will create the Nats message with headers and payload. +// The payload of the nats message, which is the ctrl message will be +// serialized and compress before put in the data field of the nats +// message. +// It will also take care of resending if not delievered, and timeouts. +func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { retryAttempts := 0 if message.RetryWait <= 0 { message.RetryWait = 0 } + subject := newSubject(message.Method, string(message.ToNode)) + // The for loop will run until the message is delivered successfully, // or that retries are reached. for { msg := &nats.Msg{ - Subject: string(p.subject.name()), + Subject: string(subject.name()), // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"), // Structure of the reply message are: // ...reply - Reply: fmt.Sprintf("%s.reply", p.subject.name()), + Reply: fmt.Sprintf("%s.reply", subject.name()), Data: natsMsgPayload, Header: natsMsgHeader, } @@ -390,7 +335,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He switch { case err == nats.ErrNoResponders || err == nats.ErrTimeout: - er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, subject.name(), err) p.errorKernel.logDebug(er) time.Sleep(time.Second * time.Duration(message.RetryWait)) @@ -399,13 +344,13 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return ErrACKSubscribeRetry case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: - er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", subject.name(), err) p.errorKernel.logDebug(er) return er default: - er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", subject.name(), err) p.errorKernel.logDebug(er) return er @@ -442,7 +387,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // kind of message it is and then it will check how to handle that message type, // and then call the correct method handler for it. // -// This handler function should be started in it's own go routine,so +// This function should be started in it's own go routine,so // one individual handler is started per message received so we can keep // the state of the message being processed, and then reply back to the // correct sending process's reply, meaning so we ACK back to the correct @@ -462,90 +407,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, p.errorKernel.logDebug(er) } - // If compression is used, decompress it to get the gob data. If - // compression is not used it is the gob encoded data we already - // got in msgData so we do nothing with it. - if val, ok := msg.Header["cmp"]; ok { - switch val[0] { - case "z": - zr, err := zstd.NewReader(nil) - if err != nil { - er := fmt.Errorf("error: zstd NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - msgData, err = zr.DecodeAll(msg.Data, nil) - if err != nil { - er := fmt.Errorf("error: zstd decoding failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - zr.Close() - return - } - - zr.Close() - - case "g": - r := bytes.NewReader(msgData) - gr, err := gzip.NewReader(r) - if err != nil { - er := fmt.Errorf("error: gzip NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logError) - return - } - - b, err := io.ReadAll(gr) - if err != nil { - er := fmt.Errorf("error: gzip ReadAll failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - - gr.Close() - - msgData = b - } - } - - message := Message{} - - // TODO: Jetstream - // Use CBOR and Compression for all messages, and drop the use of the header fields. - - // Check if serialization is specified. - // Will default to gob serialization if nothing or non existing value is specified. - if val, ok := msg.Header["serial"]; ok { - // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) - switch val[0] { - case "cbor": - err := cbor.Unmarshal(msgData, &message) - if err != nil { - er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - default: // Deaults to gob if no match was found. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - } - - } else { - // Default to gob if serialization flag was not specified. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } + message, err := p.server.messageDeserializeAndUncompress(msgData) + if err != nil { + er := fmt.Errorf("error: messageSubscriberHandler: deserialize and uncompress failed: %v", err) + // p.errorKernel.logDebug(er) + log.Fatalf("%v\n", er) } // Check if it is an ACK or NACK message, and do the appropriate action accordingly. @@ -778,7 +644,7 @@ func (p process) verifySigOrAclFlag(message Message) bool { // SubscribeMessage will register the Nats callback function for the specified // nats subject. This allows us to receive Nats messages for a given subject // on a node. -func (p process) subscribeMessages() *nats.Subscription { +func (p process) startNatsSubscriber() *nats.Subscription { subject := string(p.subject.name()) // natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) { @@ -796,82 +662,6 @@ func (p process) subscribeMessages() *nats.Subscription { return natsSubscription } -// publishMessages will do the publishing of messages for one single -// process. The function should be run as a goroutine, and will run -// as long as the process it belongs to is running. -func (p process) publishMessages(natsConn *nats.Conn) { - var once sync.Once - - var zEnc *zstd.Encoder - // Prepare a zstd encoder if enabled. By enabling it here before - // looping over the messages to send below, we can reuse the zstd - // encoder for all messages. - switch p.configuration.Compression { - case "z": // zstd - // enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) - enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) - if err != nil { - er := fmt.Errorf("error: zstd new encoder failed: %v", err) - p.errorKernel.logError(er) - os.Exit(1) - } - zEnc = enc - defer zEnc.Close() - - } - - // Adding a timer that will be used for when to remove the sub process - // publisher. The timer is reset each time a message is published with - // the process, so the sub process publisher will not be removed until - // it have not received any messages for the given amount of time. - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) - defer ticker.Stop() - - for { - - // Wait and read the next message on the message channel, or - // exit this function if Cancel are received via ctx. - select { - case <-ticker.C: - if p.isLongRunningPublisher { - er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) - //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) - - continue - } - - // We only want to remove subprocesses - // REMOVED 120123: Removed if so all publishers should be canceled if inactive. - //if p.isSubProcess { - p.processes.active.mu.Lock() - p.ctxCancel() - delete(p.processes.active.procNames, p.processName) - p.processes.active.mu.Unlock() - - er := fmt.Errorf("info: canceled publisher: %v", p.processName) - //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) - - return - //} - - case m := <-p.subject.messageCh: - ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) - // Sign the methodArgs, and add the signature to the message. - m.ArgSignature = p.addMethodArgSignature(m) - // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - - go p.publishAMessage(m, zEnc, &once, natsConn) - case <-p.ctx.Done(): - er := fmt.Errorf("info: canceling publisher: %v", p.processName) - //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) - return - } - } -} - func (p process) addMethodArgSignature(m Message) []byte { argsString := argsToString(m.MethodArgs) sign := ed25519.Sign(p.nodeAuth.SignPrivateKey, []byte(argsString)) @@ -879,108 +669,26 @@ func (p process) addMethodArgSignature(m Message) []byte { return sign } -func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) { +func (p process) publishAMessage(m Message, natsConn *nats.Conn) { // Create the initial header, and set values below depending on the // various configuration options chosen. natsMsgHeader := make(nats.Header) natsMsgHeader["fromNode"] = []string{string(p.node)} - // The serialized value of the nats message payload - var natsMsgPayloadSerialized []byte - - // encode the message structure into gob binary format before putting - // it into a nats message. - // Prepare a gob encoder with a buffer before we start the loop - switch p.configuration.Serialization { - case "cbor": - b, err := cbor.Marshal(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = b - natsMsgHeader["serial"] = []string{p.configuration.Serialization} - - default: - var bufGob bytes.Buffer - gobEnc := gob.NewEncoder(&bufGob) - err := gobEnc.Encode(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = bufGob.Bytes() - natsMsgHeader["serial"] = []string{"gob"} - } - - // Get the process name so we can look up the process in the - // processes map, and increment the message counter. - pn := processNameGet(p.subject.name(), processKindPublisher) - - // The compressed value of the nats message payload. The content - // can either be compressed or in it's original form depening on - // the outcome of the switch below, and if compression were chosen - // or not. - var natsMsgPayloadCompressed []byte - - // Compress the data payload if selected with configuration flag. - // The compression chosen is later set in the nats msg header when - // calling p.messageDeliverNats below. - switch p.configuration.Compression { - case "z": // zstd - natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - // p.zEncMutex.Lock() - // zEnc.Reset(nil) - // p.zEncMutex.Unlock() - - case "g": // gzip - var buf bytes.Buffer - func() { - gzipW := gzip.NewWriter(&buf) - defer gzipW.Close() - defer gzipW.Flush() - _, err := gzipW.Write(natsMsgPayloadSerialized) - if err != nil { - er := fmt.Errorf("error: failed to write gzip: %v", err) - p.errorKernel.logDebug(er) - return - } - - }() - - natsMsgPayloadCompressed = buf.Bytes() - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - case "": // no compression - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} - - default: // no compression - // Allways log the error to console. - er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression") + b, err := p.server.messageSerializeAndCompress(m) + if err != nil { + er := fmt.Errorf("error: publishAMessage: serialize and compress failed: %v", err) p.errorKernel.logDebug(er) - - // We only wan't to send the error message to errorCentral once. - once.Do(func() { - p.errorKernel.logDebug(er) - }) - - // No compression, so we just assign the value of the serialized - // data directly to the variable used with messageDeliverNats. - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} + return } // Create the Nats message with headers and payload, and do the // sending of the message. - p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) + p.publishNats(b, natsMsgHeader, natsConn, m) + // Get the process name so we can look up the process in the + // processes map, and increment the message counter. + pn := processNameGet(p.subject.name()) // Increment the counter for the next message to be sent. p.messageID++ diff --git a/processes.go b/processes.go index 469af34..3e738da 100644 --- a/processes.go +++ b/processes.go @@ -92,25 +92,25 @@ func (p *processes) Start(proc process) { // --- Subscriber services that can be started via flags - proc.startup.subscriber(proc, OpProcessList, nil) - proc.startup.subscriber(proc, OpProcessStart, nil) - proc.startup.subscriber(proc, OpProcessStop, nil) - proc.startup.subscriber(proc, Test, nil) + proc.startup.startProcess(proc, OpProcessList, nil) + proc.startup.startProcess(proc, OpProcessStart, nil) + proc.startup.startProcess(proc, OpProcessStop, nil) + proc.startup.startProcess(proc, Test, nil) if proc.configuration.StartProcesses.StartSubFileAppend { - proc.startup.subscriber(proc, FileAppend, nil) + proc.startup.startProcess(proc, FileAppend, nil) } if proc.configuration.StartProcesses.StartSubFile { - proc.startup.subscriber(proc, File, nil) + proc.startup.startProcess(proc, File, nil) } if proc.configuration.StartProcesses.StartSubCopySrc { - proc.startup.subscriber(proc, CopySrc, nil) + proc.startup.startProcess(proc, CopySrc, nil) } if proc.configuration.StartProcesses.StartSubCopyDst { - proc.startup.subscriber(proc, CopyDst, nil) + proc.startup.startProcess(proc, CopyDst, nil) } if proc.configuration.StartProcesses.StartSubHello { @@ -149,23 +149,24 @@ func (p *processes) Start(proc process) { } } - proc.startup.subscriber(proc, Hello, pf) + proc.startup.startProcess(proc, Hello, pf) } if proc.configuration.StartProcesses.IsCentralErrorLogger { - proc.startup.subscriber(proc, ErrorLog, nil) + proc.startup.startProcess(proc, ErrorLog, nil) } if proc.configuration.StartProcesses.StartSubCliCommand { - proc.startup.subscriber(proc, CliCommand, nil) + proc.startup.startProcess(proc, CliCommand, nil) } if proc.configuration.StartProcesses.StartSubConsole { - proc.startup.subscriber(proc, Console, nil) + proc.startup.startProcess(proc, Console, nil) } if proc.configuration.StartProcesses.StartPubHello != 0 { pf := func(ctx context.Context, procFuncCh chan Message) error { + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello)) defer ticker.Stop() for { @@ -185,13 +186,7 @@ func (p *processes) Start(proc process) { Retries: 1, } - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: ProcessesStart: %v", err) - p.errorKernel.errSend(proc, m, er, logError) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: @@ -203,7 +198,7 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, Hello, pf) + proc.startup.startProcess(proc, HelloPublisher, pf) } if proc.configuration.StartProcesses.EnableKeyUpdates { @@ -236,12 +231,7 @@ func (p *processes) Start(proc process) { } proc.nodeAuth.publicKeys.mu.Unlock() - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(proc, m, err, logError) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: @@ -253,8 +243,8 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, KeysRequestUpdate, pf) - proc.startup.subscriber(proc, KeysDeliverUpdate, nil) + proc.startup.startProcess(proc, KeysRequestUpdate, pf) + proc.startup.startProcess(proc, KeysDeliverUpdate, nil) } if proc.configuration.StartProcesses.EnableAclUpdates { @@ -284,13 +274,7 @@ func (p *processes) Start(proc process) { } proc.nodeAuth.nodeAcl.mu.Unlock() - sam, err := newSubjectAndMessage(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(proc, m, err, logError) - log.Printf("error: ProcessesStart: %v\n", err) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- m select { case <-ticker.C: @@ -302,41 +286,41 @@ func (p *processes) Start(proc process) { } } } - proc.startup.publisher(proc, AclRequestUpdate, pf) - proc.startup.subscriber(proc, AclDeliverUpdate, nil) + proc.startup.startProcess(proc, AclRequestUpdate, pf) + proc.startup.startProcess(proc, AclDeliverUpdate, nil) } if proc.configuration.StartProcesses.IsCentralAuth { - proc.startup.subscriber(proc, KeysRequestUpdate, nil) - proc.startup.subscriber(proc, KeysAllow, nil) - proc.startup.subscriber(proc, KeysDelete, nil) - proc.startup.subscriber(proc, AclRequestUpdate, nil) - proc.startup.subscriber(proc, AclAddCommand, nil) - proc.startup.subscriber(proc, AclDeleteCommand, nil) - proc.startup.subscriber(proc, AclDeleteSource, nil) - proc.startup.subscriber(proc, AclGroupNodesAddNode, nil) - proc.startup.subscriber(proc, AclGroupNodesDeleteNode, nil) - proc.startup.subscriber(proc, AclGroupNodesDeleteGroup, nil) - proc.startup.subscriber(proc, AclGroupCommandsAddCommand, nil) - proc.startup.subscriber(proc, AclGroupCommandsDeleteCommand, nil) - proc.startup.subscriber(proc, AclGroupCommandsDeleteGroup, nil) - proc.startup.subscriber(proc, AclExport, nil) - proc.startup.subscriber(proc, AclImport, nil) + proc.startup.startProcess(proc, KeysRequestUpdate, nil) + proc.startup.startProcess(proc, KeysAllow, nil) + proc.startup.startProcess(proc, KeysDelete, nil) + proc.startup.startProcess(proc, AclRequestUpdate, nil) + proc.startup.startProcess(proc, AclAddCommand, nil) + proc.startup.startProcess(proc, AclDeleteCommand, nil) + proc.startup.startProcess(proc, AclDeleteSource, nil) + proc.startup.startProcess(proc, AclGroupNodesAddNode, nil) + proc.startup.startProcess(proc, AclGroupNodesDeleteNode, nil) + proc.startup.startProcess(proc, AclGroupNodesDeleteGroup, nil) + proc.startup.startProcess(proc, AclGroupCommandsAddCommand, nil) + proc.startup.startProcess(proc, AclGroupCommandsDeleteCommand, nil) + proc.startup.startProcess(proc, AclGroupCommandsDeleteGroup, nil) + proc.startup.startProcess(proc, AclExport, nil) + proc.startup.startProcess(proc, AclImport, nil) } if proc.configuration.StartProcesses.StartSubHttpGet { - proc.startup.subscriber(proc, HttpGet, nil) + proc.startup.startProcess(proc, HttpGet, nil) } if proc.configuration.StartProcesses.StartSubTailFile { - proc.startup.subscriber(proc, TailFile, nil) + proc.startup.startProcess(proc, TailFile, nil) } if proc.configuration.StartProcesses.StartSubCliCommandCont { - proc.startup.subscriber(proc, CliCommandCont, nil) + proc.startup.startProcess(proc, CliCommandCont, nil) } - proc.startup.subscriber(proc, PublicKey, nil) + proc.startup.startProcess(proc, PublicKey, nil) } // Stop all subscriber processes. @@ -367,9 +351,9 @@ func newStartup(server *server) *startup { return &s } -// subscriber will start a subscriber process. It takes the initial process, request method, +// startProcess will start a process. It takes the initial process, request method, // and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. -func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { +func (s *startup) startProcess(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v subscriber: %#v", m, p.node) p.errorKernel.logDebug(er) @@ -382,23 +366,10 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p } fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) - proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) + proc := newProcess(p.ctx, p.processes.server, sub) proc.procFunc = pf - go proc.spawnWorker() -} - -// publisher will start a publisher process. It takes the initial process, request method, -// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. -func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { - er := fmt.Errorf("starting %v publisher: %#v", m, p.node) - p.errorKernel.logDebug(er) - sub := newSubject(m, string(p.node)) - proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) - proc.procFunc = pf - proc.isLongRunningPublisher = true - - go proc.spawnWorker() + go proc.start() } // --------------------------------------------------------------- @@ -412,7 +383,7 @@ func (p *processes) printProcessesMap() { p.active.mu.Lock() for pName, proc := range p.active.procNames { - er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name()) + er := fmt.Errorf("info: proc - procName in map: %v , id: %v, subject: %v", pName, proc.processID, proc.subject.name()) proc.errorKernel.logDebug(er) } diff --git a/requests.go b/requests.go index 9fde69c..21a272e 100644 --- a/requests.go +++ b/requests.go @@ -104,8 +104,9 @@ const ( SUBCopySrc Method = "subCopySrc" // Write the destination copied to some node. SUBCopyDst Method = "subCopyDst" - // Send Hello I'm here message. - Hello Method = "hello" + // Hello I'm here message. + Hello Method = "hello" + HelloPublisher Method = "helloPublisher" // Error log methods to centralError node. ErrorLog Method = "errorLog" // Http Get @@ -187,6 +188,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { SUBCopySrc: HandlerFunc(methodSUB), SUBCopyDst: HandlerFunc(methodSUB), Hello: HandlerFunc(methodHello), + HelloPublisher: HandlerFunc(nil), ErrorLog: HandlerFunc(methodErrorLog), HttpGet: HandlerFunc(methodHttpGet), HttpGetScheduled: HandlerFunc(methodHttpGetScheduled), @@ -352,14 +354,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { PreviousMessage: &thisMsg, } - sam, err := newSubjectAndMessage(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logError) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- newMsg } // selectFileNaming will figure out the correct naming of the file diff --git a/requests_cli.go b/requests_cli.go index ef23bac..f8cb03c 100644 --- a/requests_cli.go +++ b/requests_cli.go @@ -55,7 +55,10 @@ func methodCliCommand(proc process, message Message, node string) ([]byte, error // data payload there. var foundEnvData bool var envData string + //var foundEnvFile bool + //var envFile string for i, v := range message.MethodArgs { + // Check if to use the content of the data field are specified. if strings.Contains(v, "{{CTRL_DATA}}") { foundEnvData = true // Replace the found env variable placeholder with an actual env variable diff --git a/requests_copy.go b/requests_copy.go index 6fcb94b..b7c304d 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -86,26 +86,12 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // we should forward the message to that specified toNode. This will allow // us to initiate a file copy from another node to this node. if message.ToNode != proc.node { - sam, err := newSubjectAndMessage(message) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - proc.newMessagesCh <- sam + proc.newMessagesCh <- message return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) } - // Check if the filepaths for the socket a realpaths. - file := filepath.Join(message.Directory, message.FileName) - if strings.HasPrefix(file, "./") || !strings.HasPrefix(file, "/") { - er := fmt.Errorf("error: copySrcSubHandler: path in message started with ./ or no directory at all, only full paths are allowed, path given was : %v", file) - proc.errorKernel.errSend(proc, message, er, logError) - newReplyMessage(proc, message, []byte(er.Error())) - return nil, er - } - var subProcessName string proc.processes.wg.Add(1) @@ -225,7 +211,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // Create a new sub process that will do the actual file copying. - copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copySrcSubProc := newSubProcess(ctx, proc.server, sub) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -235,7 +221,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { copySrcSubProc.handler = copySrcSubHandler() // The process will be killed when the context expires. - go copySrcSubProc.spawnWorker() + go copySrcSubProc.start() // Send a message over the the node where the destination file will be written, // to also start up a sub process on the destination node. @@ -260,14 +246,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // msg.Directory = dstDir // msg.FileName = dstFile - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - cancel() - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName) @@ -280,8 +259,8 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { } // newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. -func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { - p := newProcess(ctx, server, subject, processKind) +func newSubProcess(ctx context.Context, server *server, subject Subject) process { + p := newProcess(ctx, server, subject) p.isSubProcess = true return p @@ -333,7 +312,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { // previous message is then fully up and running, so we just discard // that second message in those cases. - pn := processNameGet(sub.name(), processKindSubscriber) + pn := processNameGet(sub.name()) // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) proc.processes.active.mu.Lock() @@ -352,7 +331,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { } // Create a new sub process that will do the actual file copying. - copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copyDstSubProc := newSubProcess(ctx, proc.server, sub) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -362,7 +341,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { copyDstSubProc.handler = copyDstSubHandler() // The process will be killed when the context expires. - go copyDstSubProc.spawnWorker() + go copyDstSubProc.start() fp := filepath.Join(cia.DstDir, cia.DstFile) replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName) @@ -445,8 +424,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel // We don't care about the error. fi, err := os.Stat(file) if err != nil { - fmt.Printf(" ** DEBUG: STAT ERROR: %v\n", err) - fmt.Printf(" ** DEBUG: fi: %#v\n", fi) + er := fmt.Errorf("DEBUG: ERROR while os.Stat(file): copySrcProcFunc, fileInfo: %v, err: %v", fi, err) + proc.errorKernel.errSend(proc, Message{}, er, logWarning) + } // We want to be able to send the reply message when the copying is done, @@ -558,15 +538,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel ReplyRetries: initialMessage.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg resendRetries = 0 @@ -628,15 +600,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel ReplyRetries: initialMessage.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg resendRetries++ @@ -692,14 +656,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg } // Open a tmp folder for where to write the received chunks @@ -794,14 +751,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg case copyResendLast: // The csa already contains copyStatus copyResendLast when reached here, @@ -827,14 +777,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg case copySrcDone: err := func() error { @@ -847,7 +790,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc proc.errorKernel.logDebug(er) if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) { - // TODO: Add option to set permission here ??? err := os.MkdirAll(cia.DstDir, fs.FileMode(cia.FolderPermission)) if err != nil { return fmt.Errorf("error: failed to create destination directory for file copying %v: %v", cia.DstDir, err) @@ -981,14 +923,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc ReplyRetries: message.ReplyRetries, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er, logWarning) - return er - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg } cancel() diff --git a/requests_file_handling.go b/requests_file_handling.go index 2748e31..f4d4464 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -17,8 +17,6 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error { fileName, folderTree := selectFileNaming(message, proc) file := filepath.Join(folderTree, fileName) - fmt.Printf("******************* DEBUG: CHECK IF SOCKET OR FILE: %v\n", file) - // log.Fatalf("EXITING\n") // Check the file is a unix socket, and if it is we write the @@ -70,8 +68,8 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error { // Open file and write data. f, err := os.OpenFile(file, fileFlag, 0755) if err != nil { - er := fmt.Errorf("error: methodToFile/Append: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - + er := fmt.Errorf("error: methodToFile/Append: failed to open file %v, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", file, message.Directory, message.FileName, err) + fmt.Printf("%v\n", er) return er } defer f.Close() diff --git a/requests_keys.go b/requests_keys.go index e222fb5..fea983a 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -125,10 +125,6 @@ func methodKeysRequestUpdate(proc process, message Message, node string) ([]byte func methodKeysDeliverUpdate(proc process, message Message, node string) ([]byte, error) { // Get a context with the timeout specified in message.MethodTimeout. - // TODO: - // - Since this is implemented as a NACK message we could implement a - // metric thats shows the last time keys were updated. - ctx, _ := getContextForMethodTimeout(proc.ctx, message) proc.processes.wg.Add(1) @@ -327,14 +323,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { ACKTimeout: 0, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) proc.errorKernel.logDebug(er) @@ -374,14 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { ACKTimeout: 0, } - sam, err := newSubjectAndMessage(msg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er, logWarning) - } - - proc.newMessagesCh <- sam + proc.newMessagesCh <- msg er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode) proc.errorKernel.logDebug(er) diff --git a/requests_operator.go b/requests_operator.go index 743a032..bd4889d 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -23,7 +23,7 @@ func methodOpProcessList(proc process, message Message, node string) ([]byte, er proc.processes.active.mu.Lock() for _, pTmp := range proc.processes.active.procNames { - s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name()) + s := fmt.Sprintf("%v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processID, pTmp.subject.name()) sb := []byte(s) out = append(out, sb...) @@ -68,8 +68,8 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) - go procNew.spawnWorker() + procNew := newProcess(proc.ctx, proc.server, sub) + go procNew.start() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf("%v", txt) @@ -115,7 +115,6 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er methodString := message.MethodArgs[0] node := message.MethodArgs[1] - kind := message.MethodArgs[2] method := Method(methodString) tmpH := mt.getHandler(Method(method)) @@ -132,7 +131,7 @@ func methodOpProcessStop(proc process, message Message, node string) ([]byte, er // We can then use this processName to get the real values for the // actual process we want to stop. sub := newSubject(method, string(node)) - processName := processNameGet(sub.name(), processKind(kind)) + processName := processNameGet(sub.name()) // Remove the process from the processes active map if found. proc.processes.active.mu.Lock() diff --git a/requests_test.go b/requests_test.go index 391355c..e76b442 100644 --- a/requests_test.go +++ b/requests_test.go @@ -93,8 +93,9 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co func newNatsServerForTesting(port int) *natsserver.Server { // Start up the nats-server message broker. nsOpt := &natsserver.Options{ - Host: "127.0.0.1", - Port: port, + Host: "127.0.0.1", + Port: port, + JetStream: true, } ns, err := natsserver.NewServer(nsOpt) @@ -284,12 +285,7 @@ func TestRequest(t *testing.T) { for _, tt := range tests { switch tt.viaSocketOrCh { case viaCh: - sam, err := newSubjectAndMessage(tt.message) - if err != nil { - t.Fatalf("newSubjectAndMessage failed: %v\n", err) - } - - tstSrv.newMessagesCh <- sam + tstSrv.newMessagesCh <- tt.message case viaSocket: msgs := []Message{tt.message} @@ -338,6 +334,7 @@ func TestRequest(t *testing.T) { checkREQTailFileTest(tstConf, t, tstTempDir) checkMetricValuesTest(tstSrv, t) checkErrorKernelMalformedJSONtest(tstConf, t) + t.Log("*******starting with checkREQCopySrc\n") checkREQCopySrc(tstConf, t, tstTempDir) } @@ -449,7 +446,8 @@ func checkREQCopySrc(conf *Configuration, t *testing.T, tmpDir string) error { "methodArgs": ["` + srcfp + `","central","` + dstfp + `","20","10"], "ACKTimeout":5, "retries":3, - "methodTimeout": 10 + "methodTimeout": 10, + "fileName": "filecopy2.log" } ]` diff --git a/server.go b/server.go index 37ed884..7401ae3 100644 --- a/server.go +++ b/server.go @@ -5,15 +5,20 @@ import ( "context" "encoding/json" "fmt" + "io" "log" "net" "net/http" "os" "path/filepath" + "regexp" + "strings" "sync" "time" + "github.com/fxamacker/cbor/v2" "github.com/jinzhu/copier" + "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" ) @@ -21,9 +26,8 @@ import ( type processName string // Will return a process name made up of subjectName+processKind -func processNameGet(sn subjectName, pk processKind) processName { - pn := fmt.Sprintf("%s_%s", sn, pk) - return processName(pn) +func processNameGet(sn subjectName) processName { + return processName(sn) } // server is the structure that will hold the state about spawned @@ -48,9 +52,9 @@ type server struct { // // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. - newMessagesCh chan subjectAndMessage - // directSAMSCh - samSendLocalCh chan []subjectAndMessage + newMessagesCh chan Message + // messageDeliverLocalCh + messageDeliverLocalCh chan []Message // Channel for messages to publish with Jetstream. jetstreamPublishCh chan Message // errorKernel is doing all the error handling like what to do if @@ -74,7 +78,8 @@ type server struct { // message ID messageID messageID // audit logging - auditLogCh chan []subjectAndMessage + auditLogCh chan []Message + zstdEncoder *zstd.Encoder } type messageID struct { @@ -114,7 +119,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { // return nil, fmt.Errorf("error: failed to create tmp seed file: %v", err) // } - err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0700) + err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0600) if err != nil { cancel() return nil, fmt.Errorf("error: failed to write temp seed file: %v", err) @@ -126,13 +131,14 @@ func NewServer(configuration *Configuration, version string) (*server, error) { return nil, fmt.Errorf("error: failed to read temp nkey seed file: %v", err) } - defer func() { - err = os.Remove(pth) - if err != nil { - cancel() - log.Fatalf("error: failed to remove temp seed file: %v\n", err) - } - }() + // // TODO: REMOVED for testing + //defer func() { + // err = os.Remove(pth) + // if err != nil { + // cancel() + // log.Fatalf("error: failed to remove temp seed file: %v\n", err) + // } + //}() case configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "": var err error @@ -212,23 +218,36 @@ func NewServer(configuration *Configuration, version string) (*server, error) { centralAuth := newCentralAuth(configuration, errorKernel) //} + zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + if err != nil { + log.Fatalf("error: zstd new encoder failed: %v", err) + } + + defer func() { + go func() { + <-ctx.Done() + zstdEncoder.Close() + }() + }() + s := server{ - ctx: ctx, - cancel: cancel, - configuration: configuration, - nodeName: configuration.NodeName, - natsConn: conn, - ctrlSocket: ctrlSocket, - newMessagesCh: make(chan subjectAndMessage), - samSendLocalCh: make(chan []subjectAndMessage), - jetstreamPublishCh: make(chan Message), - metrics: metrics, - version: version, - errorKernel: errorKernel, - nodeAuth: nodeAuth, - helloRegister: newHelloRegister(), - centralAuth: centralAuth, - auditLogCh: make(chan []subjectAndMessage), + ctx: ctx, + cancel: cancel, + configuration: configuration, + nodeName: configuration.NodeName, + natsConn: conn, + ctrlSocket: ctrlSocket, + newMessagesCh: make(chan Message), + messageDeliverLocalCh: make(chan []Message), + jetstreamPublishCh: make(chan Message), + metrics: metrics, + version: version, + errorKernel: errorKernel, + nodeAuth: nodeAuth, + helloRegister: newHelloRegister(), + centralAuth: centralAuth, + auditLogCh: make(chan []Message), + zstdEncoder: zstdEncoder, } s.processes = newProcesses(ctx, &s) @@ -340,7 +359,7 @@ func (s *server) Start() { // // The context of the initial process are set in processes.Start. sub := newSubject(Initial, s.nodeName) - s.processInitial = newProcess(context.TODO(), s, sub, "") + s.processInitial = newProcess(context.TODO(), s, sub) // Start all wanted subscriber processes. s.processes.Start(s.processInitial) @@ -358,7 +377,7 @@ func (s *server) Start() { } // Start the processing of new messages from an input channel. - s.routeMessagesToProcess() + s.routeMessagesToPublisherProcess() // Start reading the channel for injecting direct messages that should // not be sent via the message broker. @@ -381,11 +400,11 @@ func (s *server) startAuditLog(ctx context.Context) { for { select { - case sams := <-s.auditLogCh: + case messages := <-s.auditLogCh: - for _, sam := range sams { + for _, message := range messages { msgForPermStore := Message{} - copier.Copy(&msgForPermStore, sam.Message) + copier.Copy(&msgForPermStore, message) // Remove the content of the data field. msgForPermStore.Data = nil @@ -417,27 +436,29 @@ func (s *server) directSAMSChRead() { case <-s.ctx.Done(): log.Printf("info: stopped the directSAMSCh reader\n\n") return - case sams := <-s.samSendLocalCh: + case messages := <-s.messageDeliverLocalCh: // fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams) // Range over all the sams, find the process, check if the method exists, and // handle the message by starting the correct method handler. - for i := range sams { - processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) + for i := range messages { + // TODO: !!!!!! Shoud the node here be the fromNode ??????? + subject := newSubject(messages[i].Method, string(messages[i].ToNode)) + processName := processNameGet(subject.name()) s.processes.active.mu.Lock() p := s.processes.active.procNames[processName] s.processes.active.mu.Unlock() - mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) + mh, ok := p.methodsAvailable.CheckIfExists(messages[i].Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Method) - p.errorKernel.errSend(p, sams[i].Message, er, logError) + p.errorKernel.errSend(p, messages[i], er, logError) continue } p.handler = mh - go executeHandler(p, sams[i].Message, s.nodeName) + go executeHandler(p, messages[i], s.nodeName) } } } @@ -471,7 +492,7 @@ func (s *server) Stop() { } -// routeMessagesToProcess takes a database name it's input argument. +// routeMessagesToPublisherProcess takes a database name it's input argument. // The database will be used as the persistent k/v store for the work // queue which is implemented as a ring buffer. // The ringBufferInCh are where we get new messages to publish. @@ -480,7 +501,7 @@ func (s *server) Stop() { // worker process. // It will also handle the process of spawning more worker processes // for publisher subjects if it does not exist. -func (s *server) routeMessagesToProcess() { +func (s *server) routeMessagesToPublisherProcess() { // Start reading new fresh messages received on the incomming message // pipe/file. @@ -492,117 +513,85 @@ func (s *server) routeMessagesToProcess() { methodsAvailable := method.GetMethodsAvailable() go func() { - for sam := range s.newMessagesCh { + for message := range s.newMessagesCh { - go func(sam subjectAndMessage) { - // TODO: Jetstream - // Check if Jetstream stream are specified, - // and send off to Jetstream publisher. + go func(message Message) { s.messageID.mu.Lock() s.messageID.id++ - sam.Message.ID = s.messageID.id + message.ID = s.messageID.id s.messageID.mu.Unlock() - s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID)) + s.metrics.promMessagesProcessedIDLast.Set(float64(message.ID)) // Check if the format of the message is correct. - if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er, logError) + if _, ok := methodsAvailable.CheckIfExists(message.Method); !ok { + er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", message.Method) + s.errorKernel.errSend(s.processInitial, message, er, logError) return } switch { - case sam.Message.Retries < 0: - sam.Message.Retries = s.configuration.DefaultMessageRetries + case message.Retries < 0: + message.Retries = s.configuration.DefaultMessageRetries } - if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 { - sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout + if message.MethodTimeout < 1 && message.MethodTimeout != -1 { + message.MethodTimeout = s.configuration.DefaultMethodTimeout } // --- + // Check for {{CTRL_FILE}} and if we should read and load a local file into + // the message before sending. - m := sam.Message + var filePathToOpen string + foundFile := false + var argPos int + for i, v := range message.MethodArgs { + if strings.Contains(v, "{{CTRL_FILE:") { + foundFile = true + argPos = i - subjName := sam.Subject.name() - pn := processNameGet(subjName, processKindPublisher) + // Example to split: + // echo {{CTRL_FILE:/somedir/msg_file.yaml}}>ctrlfile.txt + // + // Split at colon. We want the part after. + ss := strings.Split(v, ":") + // Split at "}}",so pos [0] in the result contains just the file path. + sss := strings.Split(ss[1], "}}") + filePathToOpen = sss[0] - sendOK := func() bool { - var ctxCanceled bool - - s.processes.active.mu.Lock() - defer s.processes.active.mu.Unlock() - - // Check if the process exist, if it do not exist return false so a - // new publisher process will be created. - proc, ok := s.processes.active.procNames[pn] - if !ok { - return false } - - if proc.ctx.Err() != nil { - ctxCanceled = true - } - if ok && ctxCanceled { - er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName) - s.errorKernel.logDebug(er) - delete(proc.processes.active.procNames, proc.processName) - return false - } - - // If found in map above, and go routine for publishing is running, - // put the message on that processes incomming message channel. - if ok && !ctxCanceled { - select { - case proc.subject.messageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er) - } - - return true - } - - // The process was not found, so we return false here so a new publisher - // process will be created later. - return false - }() - - if sendOK { - return } - er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName) - s.errorKernel.logDebug(er) + if foundFile { + + fh, err := os.Open(filePathToOpen) + if err != nil { + er := fmt.Errorf("error: routeMessagesToPublisherProcess: failed to open file given as CTRL_FILE argument: %v", err) + s.errorKernel.logError(er) + return + } + defer fh.Close() + + b, err := io.ReadAll(fh) + if err != nil { + er := fmt.Errorf("error: routeMessagesToPublisherProcess: failed to read file %v given as CTRL_FILE argument: %v", filePathToOpen, err) + s.errorKernel.logError(er) + return + } + + // Replace the {{CTRL_FILE}} with the actual content read from file. + re := regexp.MustCompile(`(.*)({{CTRL_FILE.*}})(.*)`) + message.MethodArgs[argPos] = re.ReplaceAllString(message.MethodArgs[argPos], `${1}`+string(b)+`${3}`) + // --- - sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - var proc process - switch { - case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher) - default: - proc = newProcess(s.ctx, s, sub, processKindPublisher) } - proc.spawnWorker() - er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) - s.errorKernel.logDebug(er) + message.ArgSignature = s.processInitial.addMethodArgSignature(message) - // Now when the process is spawned we continue, - // and send the message to that new process. - select { - case proc.subject.messageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er) - } + go s.processInitial.publishAMessage(message, s.natsConn) - }(sam) + }(message) } }() @@ -632,3 +621,59 @@ func (s *server) exposeDataFolder() { os.Exit(1) } + +// messageSerializeAndCompress will serialize and compress the Message, and +// return the result as a []byte. +func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) { + + // encode the message structure into cbor + bSerialized, err := cbor.Marshal(msg) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) + s.errorKernel.logDebug(er) + return nil, er + } + + // Compress the data payload if selected with configuration flag. + // The compression chosen is later set in the nats msg header when + // calling p.messageDeliverNats below. + + bCompressed := s.zstdEncoder.EncodeAll(bSerialized, nil) + + return bCompressed, nil +} + +// messageDeserializeAndUncompress will deserialize the ctrl message +func (s *server) messageDeserializeAndUncompress(msgData []byte) (Message, error) { + + // // If debugging is enabled, print the source node name of the nats messages received. + // headerFromNode := msg.Headers().Get("fromNode") + // if headerFromNode != "" { + // er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject()) + // s.errorKernel.logDebug(er) + // } + + zr, err := zstd.NewReader(nil) + if err != nil { + er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err) + return Message{}, er + } + msgData, err = zr.DecodeAll(msgData, nil) + if err != nil { + er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err) + zr.Close() + return Message{}, er + } + + zr.Close() + + message := Message{} + + err = cbor.Unmarshal(msgData, &message) + if err != nil { + er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, error: %v", err) + return Message{}, er + } + + return message, nil +}