diff --git a/cmd/steward/main.go b/cmd/steward/main.go index 88b8814..2ed3f28 100644 --- a/cmd/steward/main.go +++ b/cmd/steward/main.go @@ -8,12 +8,16 @@ import ( "os/signal" "time" - _ "net/http/pprof" + // _ "net/http/pprof" "github.com/RaaLabs/steward" ) func main() { + // defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() + // defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() + // defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop() + c := steward.NewConfiguration() err := c.CheckFlags() if err != nil { diff --git a/command_event_type.go b/command_event_type.go index 246745e..cac69b1 100644 --- a/command_event_type.go +++ b/command_event_type.go @@ -48,14 +48,17 @@ const ( // Same as above, but No ACK. EventNACK CommandOrEvent = "EventNACK" // eventCommand, just wait for the ACK that the - // message is received. What action happens on the - // receiving side is up to the received to decide. + // message is received. What action happens is up to the + // received to decide. ) +// CommandOrEventAvailable are used for checking if the +// commands or events are defined. type CommandOrEventAvailable struct { topics map[CommandOrEvent]struct{} } +// Check if a command or even exists. func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent, subject Subject) bool { _, ok := co.topics[c] if ok { diff --git a/configuration_flags.go b/configuration_flags.go index 6085f55..a2ea56c 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -228,7 +228,6 @@ func (c *Configuration) CheckFlags() error { return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help") } - // NB: Disabling the config file options for now. if err := c.WriteConfigFile(); err != nil { log.Printf("error: checkFlags: failed writing config file: %v\n", err) os.Exit(1) diff --git a/errorkernel.go b/errorkernel.go index 510d850..a51c4ba 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -1,9 +1,9 @@ // The error kernel shall handle errors for a given process. -// This will be cases where the process itself where unable +// This will be cases where the process itself were unable // to handle the error on it's own, and we might need to // restart the process, or send a message back to the operator -// that the action which the message where supposed to trigger, -// or that an event where unable to be processed. +// that the action which the message where supposed to trigger +// failed, or that an event where unable to be processed. package steward diff --git a/message_and_subject.go b/message_and_subject.go index 99a8d71..3d63d13 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -61,6 +61,7 @@ type Message struct { // --- +// operation are used to specify opCmd and opArg's. type Operation struct { OpCmd string `json:"opCmd"` OpArg json.RawMessage `json:"opArg"` @@ -69,7 +70,7 @@ type Operation struct { // --- // gobEncodePayload will encode the message structure into gob -// binary format. +// binary format before putting it into a nats message. func gobEncodeMessage(m Message) ([]byte, error) { var buf bytes.Buffer gobEnc := gob.NewEncoder(&buf) @@ -83,12 +84,13 @@ func gobEncodeMessage(m Message) ([]byte, error) { // --- Subject +// Node is the type definition for the node who receive or send a message. type Node string // subject contains the representation of a subject to be used with one // specific process type Subject struct { - // node, the name of the node + // node, the name of the node to receive the message. ToNode string `json:"node" yaml:"toNode"` // messageType, command/event CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` @@ -124,6 +126,7 @@ func newSubject(method Method, node string) Subject { // subjectName is the complete representation of a subject type subjectName string +// Return a value of the subjectName for the subject as used with nats subject. func (s Subject) name() subjectName { return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent)) } diff --git a/process.go b/process.go index be8421d..f8b099b 100644 --- a/process.go +++ b/process.go @@ -22,7 +22,8 @@ const ( processKindPublisher processKind = "publisher" ) -// process are represent the communication to one individual host +// process holds all the logic to handle a message type and it's +// method, subscription/publishin messages for a subject, and more. type process struct { messageID int // the subject used for the specific process. One process @@ -189,7 +190,6 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { p.processName = pn // Add information about the new process to the started processes map. - idProcMap := make(map[int]process) idProcMap[p.processID] = p @@ -199,8 +199,9 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { } // messageDeliverNats will take care of the delivering the message -// as converted to gob format as a nats.Message. It will also take -// care of checking timeouts and retries specified for the message. +// that is converted to gob format as a nats.Message. It will also +// take care of checking timeouts and retries specified for the +// message. func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { retryAttempts := 0 @@ -263,6 +264,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // did not receive a reply, decide what to do.. retryAttempts++ log.Printf("Retry attempts:%v, retries: %v, ACKTimeout: %v\n", retryAttempts, message.Retries, message.ACKTimeout) + switch { case message.Retries == 0: // 0 indicates unlimited retries @@ -308,7 +310,9 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } + // Check if it is an ACK or NACK message, and do the appropriate action accordingly. switch { + // Check for ACK type Commands or Event. case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { @@ -342,6 +346,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, out) + // Check for NACK type Commands or Event. case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { @@ -372,7 +377,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } - // --- + default: er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) @@ -380,16 +385,15 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na } } -// Subscribe will start up a Go routine under the hood calling the -// callback function specified when a new message is received. +// SubscribeMessage will register the Nats callback function for the specified +// nats subject. This allows us to receive Nats messages for a given subject +// on a node. func (p process) subscribeMessages() *nats.Subscription { subject := string(p.subject.name()) natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { - // We start one handler per message received by using go routines here. - // This is for being able to reply back the current publisher who sent - // the message. + // Start up the subscriber handler. go p.subscriberHandler(p.natsConn, p.configuration.NodeName, msg) }) if err != nil { diff --git a/processes.go b/processes.go index e47f775..1151509 100644 --- a/processes.go +++ b/processes.go @@ -38,12 +38,14 @@ func newProcesses(ctx context.Context, promRegistry *prometheus.Registry) *proce active: make(map[processName]map[int]process), } - // Prepare the main context for the subscribers. + // Prepare the parent context for the subscribers. ctx, cancel := context.WithCancel(ctx) p.ctx = ctx p.cancel = cancel + // Register the metrics for the process. + p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ Name: "total_running_processes", Help: "The current number of total running processes", diff --git a/prometheus.go b/prometheus.go index 2920d4b..f6eb58a 100644 --- a/prometheus.go +++ b/prometheus.go @@ -28,6 +28,7 @@ func newMetrics(hostAndPort string) *metrics { return &m } +// Start the http interface for Prometheus metrics. func (m *metrics) start() error { //http.Handle("/metrics", promhttp.Handler()) diff --git a/read_socket.go b/read_socket.go index 90597e3..7613498 100644 --- a/read_socket.go +++ b/read_socket.go @@ -57,6 +57,11 @@ func (s *server) writeStewSocket(toStewSocketCh []byte) { //s.StewSockListener } +// The subject are made up of different parts of the message field. +// To make things easier and to avoid figuring out what the subject +// is in all places we've created the concept of subjectAndMessage +// (sam) where we get the subject for the message once, and use the +// sam structure with subject alongside the message instead. type subjectAndMessage struct { Subject `json:"subject" yaml:"subject"` Message `json:"message" yaml:"message"` @@ -91,7 +96,7 @@ func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) { } // newSAM will look up the correct values and value types to -// be used in a subject for a Message, and return the a combined structure +// be used in a subject for a Message (sam), and return the a combined structure // of type subjectAndMessage. func newSAM(m Message) (subjectAndMessage, error) { // We need to create a tempory method type to look up the kind for the diff --git a/ringbuffer.go b/ringbuffer.go index 54c5a1c..399d7bd 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -1,12 +1,11 @@ // Info: The idea about the ring buffer is that we have a FIFO // buffer where we store all incomming messages requested by -// operators. Each message processed will also be stored in a DB. -// -// Idea: All incomming messages should be handled from the in-memory -// buffered channel, but when they are put on the buffer they should -// also be written to the DB with a handled flag set to false. -// When a message have left the buffer the handled flag should be -// set to true. +// operators. +// Each message in process or waiting to be processed will be +// stored in a DB. When the processing of a given message is +// done it will be removed from the state db, and an entry will +// made in the persistent message log. + package steward import ( @@ -33,16 +32,23 @@ type samDBValue struct { // ringBuffer holds the data of the buffer, type ringBuffer struct { - bufData chan samDBValue - db *bolt.DB + // In memory buffer for the messages. + bufData chan samDBValue + // The database to use. + db *bolt.DB + // The current number of items in the database. totalMessagesIndex int mu sync.Mutex - permStore chan string - nodeName Node - newMessagesCh chan []subjectAndMessage + // The channel to send messages that have been processed, + // and we want to store it in the permanent message log. + permStore chan string + // Name of node. + nodeName Node + // New messages to the system to be put into the ring buffer. + newMessagesCh chan []subjectAndMessage } -// newringBuffer is a push/pop storage for values. +// newringBuffer returns a push/pop storage for values. func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { // --- // Check if socket folder exists, if not create it @@ -128,7 +134,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri } // Check for incomming messages. These are typically comming from - // the go routine who reads msg.pipe. + // the go routine who reads the socket. for v := range inCh { // Check if the command or event exists in commandOrEvent.go @@ -166,7 +172,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri js, err := json.Marshal(samV) if err != nil { - er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err) + er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) } @@ -207,7 +213,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam // We start the actual processing of an individual message here within // it's own go routine. Reason is that we don't want to block other - // messages being blocked while waiting for the done signal, or if an + // messages to be processed while waiting for the done signal, or if an // error with an individual message occurs. go func(v samDBValue) { v.Data.Message.done = make(chan struct{}) @@ -230,6 +236,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam case <-delivredCh: // OK. case <-time.After(time.Second * 5): + // TODO: Check out if more logic should be made here if messages are stuck etc. // Testing with a timeout here to figure out if messages are stuck // waiting for done signal. log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) @@ -306,7 +313,7 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) { return samDBValues, err } -// printBuckerContent will print out all they keys and values in the +// printBucketContent will print out all they keys and values in the // specified bucket. func (r *ringBuffer) printBucketContent(bucket string) error { err := r.db.View(func(tx *bolt.Tx) error { @@ -363,10 +370,10 @@ func (r *ringBuffer) getIndexValue(indexBucket string) int { return index } -// dbView will look up a specific value for a key in a bucket in a DB. +// dbView will look up and return a specific value if it exists for a key in a bucket in a DB. func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) { var value []byte - //View is a help function to get values out of the database. + // View is a help function to get values out of the database. err := db.View(func(tx *bolt.Tx) error { //Open a bucket to get key's and values from. bu := tx.Bucket([]byte(bucket)) @@ -429,9 +436,6 @@ func (r *ringBuffer) startPermanentStore() { if err != nil { log.Printf("error:failed to write entry: %v\n", err) } - - // REMOVED: time - // time.Sleep(time.Second * 1) } } diff --git a/server.go b/server.go index 879b1a4..1c5f773 100644 --- a/server.go +++ b/server.go @@ -56,6 +56,7 @@ func NewServer(c *Configuration) (*server, error) { ctx, cancel := context.WithCancel(context.Background()) var opt nats.Option + if c.RootCAPath != "" { opt = nats.RootCAs(c.RootCAPath) } @@ -70,11 +71,10 @@ func NewServer(c *Configuration) (*server, error) { } } - // Connect to the nats server, and retry until succesful. - var conn *nats.Conn const connRetryWait = 5 + // Connect to the nats server, and retry until succesful. for { var err error // Setting MaxReconnects to -1 which equals unlimited. @@ -196,7 +196,7 @@ func NewServer(c *Configuration) (*server, error) { // Start will spawn up all the predefined subscriber processes. // Spawning of publisher processes is done on the fly by checking // if there is publisher process for a given message subject, and -// not exist it will spawn one. +// if it does not exist it will spawn one. func (s *server) Start() { // Start the error kernel that will do all the error handling // that is not done within a process. @@ -242,10 +242,6 @@ func (s *server) Start() { // Will stop all processes started during startup. func (s *server) Stop() { - // TODO: Add done sync functionality within the - // stop functions so we get a confirmation that - // all processes actually are stopped. - // Stop the started pub/sub message processes. s.processes.Stop() log.Printf("info: stopped all subscribers\n") diff --git a/steward_test.go b/steward_test.go index 5105f49..544e942 100644 --- a/steward_test.go +++ b/steward_test.go @@ -40,6 +40,8 @@ func TestStewardServer(t *testing.T) { // Start Steward instance // --------------------------------------- // tempdir := t.TempDir() + + // Create the config to run a steward instance. tempdir := "./tmp" conf := &Configuration{ SocketFolder: filepath.Join(tempdir, "tmp"), @@ -200,7 +202,7 @@ func checkREQnCliCommandTest(conf *Configuration, t *testing.T) error { return nil } -// The non-sequential sending of CLI Commands. +// The continous non-sequential sending of CLI Commands. func checkREQnCliCommandContTest(conf *Configuration, t *testing.T) error { m := `[ { @@ -252,6 +254,7 @@ func checkREQHelloTest(conf *Configuration, t *testing.T) error { return nil } +// Check the error logger type. func checkREQErrorLogTest(conf *Configuration, t *testing.T) error { m := `[ { @@ -275,6 +278,7 @@ func checkREQErrorLogTest(conf *Configuration, t *testing.T) error { return nil } +// Check http get method. func checkREQHttpGetTest(conf *Configuration, t *testing.T) error { // Web server for testing. { @@ -313,8 +317,7 @@ func checkREQHttpGetTest(conf *Configuration, t *testing.T) error { return nil } -// --- - +// Check the tailing of files type. func checkREQTailFileTest(conf *Configuration, t *testing.T) error { // Create a file with some content. fh, err := os.OpenFile("test.file", os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 470e054..e4cc1f6 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -57,7 +57,7 @@ type Method string // The constants that will be used throughout the system for // when specifying what kind of Method to send or work with. const ( - // Initial method used to start other processes. + // Initial parent method used to start other processes. REQInitial Method = "REQInitial" // Command for client operation request of the system. The op // command to execute shall be given in the data field of the @@ -136,7 +136,7 @@ const ( func (m Method) GetMethodsAvailable() MethodsAvailable { // Command, Used to make a request to perform an action - // Event, Used to communicate that an action has been performed. + // Event, Used to communicate that something have happened. ma := MethodsAvailable{ Methodhandlers: map[Method]methodHandler{ REQInitial: methodREQInitial{ @@ -214,6 +214,7 @@ func (m Method) getHandler(method Method) methodHandler { // ---- +// Initial parent method used to start other processes. type methodREQInitial struct { commandOrEvent CommandOrEvent } @@ -229,6 +230,9 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([ } // ---- + +// MethodsAvailable holds a map of all the different method types and the +// associated handler to that method type. type MethodsAvailable struct { Methodhandlers map[Method]methodHandler } @@ -247,10 +251,51 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { } } +// Create a new message for the reply containing the output of the +// action executed put in outData, and put it on the ringbuffer to +// be published. +// The method to use for the reply message should initially be +// specified within the first message as the replyMethod, and we will +// pick up that value here, and use it as the method for the new +// request message. If no replyMethod is set we default to the +// REQToFileAppend method type. +func newReplyMessage(proc process, message Message, outData []byte) { + + // If no replyMethod is set we default to writing to writing to + // a log file. + if message.ReplyMethod == "" { + message.ReplyMethod = REQToFileAppend + } + + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{string(outData)}, + Method: message.ReplyMethod, + ACKTimeout: message.ReplyACKTimeout, + Retries: message.ReplyRetries, + + // Put in a copy of the initial request message, so we can use it's properties if + // needed to for example create the file structure naming on the subscriber. + PreviousMessage: &message, + } + + nSAM, err := newSAM(newMsg) + if err != nil { + // In theory the system should drop the message before it reaches here. + er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + } + proc.toRingbufferCh <- []subjectAndMessage{nSAM} +} + // ------------------------------------------------------------ // Subscriber method handlers // ------------------------------------------------------------ +// The methodHandler interface. type methodHandler interface { handler(proc process, message Message, node string) ([]byte, error) getKind() CommandOrEvent @@ -310,7 +355,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri proc.processes.mu.Unlock() case "startProc": - // Set the interface type dst to &OpStart. + // Set the empty interface type dst to &OpStart. dst = &OpCmdStartProc{} err := json.Unmarshal(message.Operation.OpArg, &dst) @@ -361,8 +406,8 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri // Assert it into the correct non pointer value. arg := *dst.(*OpCmdStopProc) - // Based on the arg values received in the message we create can - // create a processName structure as used in naming the real processes. + // Based on the arg values received in the message we create a + // processName structure as used in naming the real processes. // We can then use this processName to get the real values for the // actual process we want to stop. sub := newSubject(arg.Method, string(arg.RecevingNode)) @@ -387,6 +432,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri proc.processes.mu.Lock() + // Remove the process from the processes active map if found. toStopProc, ok := proc.processes.active[processName][arg.ID] if ok { // Delete the process from the processes map @@ -429,47 +475,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri return ackMsg, nil } -//-- -// Create a new message for the reply containing the output of the -// action executed put in outData, and put it on the ringbuffer to -// be published. -// The method to use for the reply message should initially be -// specified within the first message as the replyMethod, and we will -// pick up that value here, and use it as the method for the new -// request message. If no replyMethod is set we default to the -// REQToFileAppend method type. -func newReplyMessage(proc process, message Message, outData []byte) { - - // If no replyMethod is set we default to writing to writing to - // a log file. - if message.ReplyMethod == "" { - message.ReplyMethod = REQToFileAppend - } - //-- - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(outData)}, - Method: message.ReplyMethod, - ACKTimeout: message.ReplyACKTimeout, - Retries: message.ReplyRetries, - - // Put in a copy of the initial request message, so we can use it's properties if - // needed to for example create the file structure naming on the subscriber. - PreviousMessage: &message, - } - - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) - } - proc.toRingbufferCh <- []subjectAndMessage{nSAM} - //-- -} +//---- type methodREQToFileAppend struct { commandOrEvent CommandOrEvent @@ -479,6 +485,7 @@ func (m methodREQToFileAppend) getKind() CommandOrEvent { return m.commandOrEvent } +// Handle appending data to file. func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) { // If it was a request type message we want to check what the initial messages @@ -546,6 +553,8 @@ func (m methodREQToFile) getKind() CommandOrEvent { return m.commandOrEvent } +// Handle writing to a file. Will truncate any existing data if the file did already +// exist. func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) { // If it was a request type message we want to check what the initial messages @@ -614,10 +623,10 @@ func (m methodREQHello) getKind() CommandOrEvent { return m.commandOrEvent } +// Handler for receiving hello messages. func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) { data := fmt.Sprintf("Received hello from %#v\n", message.FromNode) - // -------------------------- fileName := fmt.Sprintf("%v.%v%v", message.FromNode, message.Method, message.FileExtension) folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode)) @@ -667,11 +676,10 @@ func (m methodREQErrorLog) getKind() CommandOrEvent { return m.commandOrEvent } +// Handle the writing of error logs. func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) - // -- - // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. var fileName string @@ -719,8 +727,6 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) ( ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil - - // -- } // --- @@ -733,7 +739,9 @@ func (m methodREQPing) getKind() CommandOrEvent { return m.commandOrEvent } +// Handle receving a ping. func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) { + // TODO: Replace this with an append to file on receival. log.Printf("<--- PING REQUEST received from: %v, containing: %v", message.FromNode, message.Data) proc.processes.wg.Add(1) @@ -759,7 +767,9 @@ func (m methodREQPong) getKind() CommandOrEvent { return m.commandOrEvent } +// Handle receiving a pong. func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) { + // TODO: Replace this with an append to file on receival. log.Printf("<--- ECHO Reply received from: %v, containing: %v", message.FromNode, message.Data) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -916,6 +926,7 @@ func (m methodREQToConsole) getKind() CommandOrEvent { return m.commandOrEvent } +// Handler to write directly to console. func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) { fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data) @@ -933,7 +944,7 @@ func (m methodREQHttpGet) getKind() CommandOrEvent { return m.commandOrEvent } -// handler to run a Http Get. +// handler to do a Http Get. func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) @@ -1190,6 +1201,8 @@ func (m methodREQToSocket) getKind() CommandOrEvent { return m.commandOrEvent } +// TODO: +// Handler to write to unix socket file. func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) { for _, d := range message.Data {