2021-02-24 14:43:31 +00:00
package steward
import (
"fmt"
2021-04-03 05:33:03 +00:00
"log"
"os"
2021-02-24 14:43:31 +00:00
)
// --- Message
type Message struct {
2021-12-29 07:29:11 +00:00
_ struct { } ` cbor:",toarray" `
2021-08-25 14:17:33 +00:00
// The node to send the message to.
2021-06-29 06:21:42 +00:00
ToNode Node ` json:"toNode" yaml:"toNode" `
2021-09-09 07:54:33 +00:00
// ToNodes to specify several hosts to send message to in the
// form of an slice/array.
2022-06-22 12:32:58 +00:00
// The ToNodes field is only a concept that exists when messages
// are injected f.ex. on a socket, and there they are directly
//converted into separate node messages for each node, and from
// there the ToNodes field is not used any more within the system.
// With other words, a message that exists within Steward is always
// for just for a single node.
2021-08-25 14:17:33 +00:00
ToNodes [ ] Node ` json:"toNodes,omitempty" yaml:"toNodes,omitempty" `
2021-02-24 14:43:31 +00:00
// The Unique ID of the message
ID int ` json:"id" yaml:"id" `
2021-09-09 07:54:33 +00:00
// The actual data in the message. This is typically where we
// specify the cli commands to execute on a node, and this is
// also the field where we put the returned data in a reply
// message.
2022-01-31 07:49:46 +00:00
Data [ ] byte ` json:"data" yaml:"data" `
2021-09-09 07:54:33 +00:00
// Method, what request type to use, like REQCliCommand, REQHttpGet..
2021-04-13 11:50:00 +00:00
Method Method ` json:"method" yaml:"method" `
2021-09-16 09:51:34 +00:00
// Additional arguments that might be needed when executing the
// method. Can be f.ex. an ip address if it is a tcp sender, or the
// shell command to execute in a cli session.
MethodArgs [ ] string ` json:"methodArgs" yaml:"methodArgs" `
2022-01-31 09:06:14 +00:00
// ArgSignature is the ed25519 signature of the methodArgs.
ArgSignature [ ] byte ` json:"argSignature" yaml:"argSignature" `
2021-04-13 11:50:00 +00:00
// ReplyMethod, is the method to use for the reply message.
// By default the reply method will be set to log to file, but
// you can override it setting your own here.
ReplyMethod Method ` json:"replyMethod" yaml:"replyMethod" `
2021-09-16 09:51:34 +00:00
// Additional arguments that might be needed when executing the reply
// method. Can be f.ex. an ip address if it is a tcp sender, or the
// shell command to execute in a cli session.
ReplyMethodArgs [ ] string ` json:"replyMethodArgs" yaml:"replyMethodArgs" `
2021-09-22 14:08:55 +00:00
// IsReply are used to tell that this is a reply message. By default
// the system sends the output of a request method back to the node
// the message originated from. If it is a reply method we want the
// result of the reply message to be sent to the central server, so
// we can use this value if set to swap the toNode, and fromNode
// fields.
IsReply bool ` json:"isReply" yaml:"isReply" `
2021-04-13 11:50:00 +00:00
// From what node the message originated
2022-01-26 08:23:02 +00:00
FromNode Node ` json:"fromNode" yaml:"fromNode" `
2021-04-15 08:33:44 +00:00
// ACKTimeout for waiting for an ack message
ACKTimeout int ` json:"ACKTimeout" yaml:"ACKTimeout" `
2022-06-16 22:39:15 +00:00
// RetryWait specified the time in seconds to wait between retries.
RetryWait int ` json:"retryWait" yaml:"retryWait" `
2022-06-17 22:03:25 +00:00
// IsSubPublishedMsg enables timeout of publishing process, and is used together with process.isSubProcess to be able to terminate the sub processes publishers.
IsSubPublishedMsg bool ` json:"isSubPublishedMsg" yaml:"isSubPublishedMsg" `
2021-04-15 08:33:44 +00:00
// Resend retries
2021-03-11 11:07:09 +00:00
Retries int ` json:"retries" yaml:"retries" `
2021-04-15 08:33:44 +00:00
// The ACK timeout of the new message created via a request event.
2021-04-15 08:52:38 +00:00
ReplyACKTimeout int ` json:"replyACKTimeout" yaml:"replyACKTimeout" `
2021-03-11 16:14:43 +00:00
// The retries of the new message created via a request event.
2021-04-13 12:37:17 +00:00
ReplyRetries int ` json:"replyRetries" yaml:"replyRetries" `
2021-03-11 16:14:43 +00:00
// Timeout for long a process should be allowed to operate
MethodTimeout int ` json:"methodTimeout" yaml:"methodTimeout" `
2021-09-16 10:37:46 +00:00
// Timeout for long a process should be allowed to operate
ReplyMethodTimeout int ` json:"replyMethodTimeout" yaml:"replyMethodTimeout" `
2021-04-07 05:00:40 +00:00
// Directory is a string that can be used to create the
//directory structure when saving the result of some method.
// For example "syslog","metrics", or "metrics/mysensor"
// The type is typically used in the handler of a method.
Directory string ` json:"directory" yaml:"directory" `
2021-09-09 07:54:33 +00:00
// FileName is used to be able to set a wanted name
2021-04-07 05:00:40 +00:00
// on a file being saved as the result of data being handled
// by a method handler.
2021-08-24 12:05:44 +00:00
FileName string ` json:"fileName" yaml:"fileName" `
2021-04-03 04:31:48 +00:00
// PreviousMessage are used for example if a reply message is
2021-06-17 07:36:42 +00:00
// generated and we also need a copy of the details of the the
2021-04-09 16:20:04 +00:00
// initial request message.
2021-04-03 04:31:48 +00:00
PreviousMessage * Message
2022-06-20 09:17:23 +00:00
// Schedule
Schedule [ ] int ` json:"schedule" yaml:"schedule" `
2021-04-03 04:31:48 +00:00
2021-02-24 14:43:31 +00:00
// done is used to signal when a message is fully processed.
2021-04-03 04:31:48 +00:00
// This is used for signaling back to the ringbuffer that we are
// done with processing a message, and the message can be removed
// from the ringbuffer and into the time series log.
2021-03-11 16:14:43 +00:00
done chan struct { }
2022-06-20 10:28:28 +00:00
// ctx for the specifix message. Used for for example canceling
// scheduled messages.
// NB: Commented out this field for specific message context
// to be used within handlers, since it will override the structure
// we have today. Keeping the code for a bit incase it makes sense
// to implement later.
//ctx context.Context
2021-02-24 14:43:31 +00:00
}
// --- Subject
2021-08-16 11:01:12 +00:00
// Node is the type definition for the node who receive or send a message.
2021-06-29 06:21:42 +00:00
type Node string
2021-02-24 14:43:31 +00:00
// subject contains the representation of a subject to be used with one
// specific process
type Subject struct {
2021-08-16 11:01:12 +00:00
// node, the name of the node to receive the message.
2021-02-24 14:43:31 +00:00
ToNode string ` json:"node" yaml:"toNode" `
2022-01-27 13:25:24 +00:00
// Event, event type like EventACK or EventNACK.
2022-01-27 09:06:06 +00:00
Event Event ` json:"event" yaml:"event" `
2021-03-01 11:16:36 +00:00
// method, what is this message doing, etc. CLICommand, Syslog, etc.
2021-02-24 14:43:31 +00:00
Method Method ` json:"method" yaml:"method" `
2021-03-09 06:43:55 +00:00
// messageCh is used by publisher kind processes to read new messages
// to be published. The content on this channel have been routed here
// from routeMessagesToPublish in *server.
// This channel is only used for publishing processes.
2021-02-24 14:43:31 +00:00
messageCh chan Message
}
// newSubject will return a new variable of the type subject, and insert
2022-06-07 09:52:30 +00:00
// all the values given as arguments. It will create the channel
2021-02-24 14:43:31 +00:00
// to receive new messages on the specific subject.
2022-06-07 09:52:30 +00:00
// The function will also verify that there is a methodHandler defined
// for the Request type.
2021-04-03 05:33:03 +00:00
func newSubject ( method Method , node string ) Subject {
2022-01-27 09:06:06 +00:00
// Get the Event type for the Method.
2021-04-03 05:33:03 +00:00
ma := method . GetMethodsAvailable ( )
2022-06-14 05:05:38 +00:00
mh , ok := ma . CheckIfExists ( method )
//mh, ok := ma.Methodhandlers[method]
2021-04-03 05:33:03 +00:00
if ! ok {
2023-01-12 11:01:01 +00:00
log . Printf ( "error: newSubject: no Event type specified for the method: %v\n" , method )
2021-04-03 05:33:03 +00:00
os . Exit ( 1 )
}
2021-02-24 14:43:31 +00:00
return Subject {
2022-01-27 09:06:06 +00:00
ToNode : node ,
Event : mh . getKind ( ) ,
Method : method ,
messageCh : make ( chan Message ) ,
2021-02-24 14:43:31 +00:00
}
}
2022-06-07 09:52:30 +00:00
// newSubjectNoVerifyHandler will return a new variable of the type subject, and insert
// all the values given as arguments. It will create the channel
// to receive new messages on the specific subject.
// The function will not verify that there is a methodHandler defined
// for the Request type.
func newSubjectNoVerifyHandler ( method Method , node string ) Subject {
// Get the Event type for the Method.
return Subject {
ToNode : node ,
Event : EventACK ,
Method : method ,
messageCh : make ( chan Message ) ,
}
}
2021-02-24 14:43:31 +00:00
// subjectName is the complete representation of a subject
type subjectName string
2021-08-16 11:01:12 +00:00
// Return a value of the subjectName for the subject as used with nats subject.
2021-02-24 14:43:31 +00:00
func ( s Subject ) name ( ) subjectName {
2022-01-27 09:06:06 +00:00
return subjectName ( fmt . Sprintf ( "%s.%s.%s" , s . ToNode , s . Method , s . Event ) )
2021-02-24 14:43:31 +00:00
}