From ed00f247ae849559a7ed010e92a0dda37b39797c Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 29 Mar 2021 13:36:30 +0200 Subject: [PATCH] unix socket for new mesages, changed naming of channels --- .gitignore | 1 + README.md | 18 ++-- example/toShip1-CLICommandRequest1.json | 2 +- go.mod | 1 - go.sum | 3 - process.go | 36 +++---- read_socket.go | 138 +++++++----------------- ringbuffer.go | 2 +- server.go | 36 +++---- startup_processes.go | 22 ++-- steward.sock | 0 subscriber_method_types.go | 12 +-- 12 files changed, 101 insertions(+), 170 deletions(-) delete mode 100644 steward.sock diff --git a/.gitignore b/.gitignore index f44921a..3b0fc2e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ tmp/ incommmingBuffer.db store.log changes.md +steward.sock diff --git a/README.md b/README.md index ca33276..4532496 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,7 @@ methodTimeout ### How to send a Message -Right now the API for sending a message from one node to another node is by pasting a structured JSON object into a file called `steward.sock` living alongside the binary. This file will be watched continously, and when updated the content will be picked up, umarshaled, and if OK it will be sent a message to the node specified in the `toNode` field. +Right now the API for sending a message from one node to another node is by pasting a structured JSON object into a file called `msg.pipe` living alongside the binary. This file will be watched continously, and when updated the content will be picked up, umarshaled, and if OK it will be sent a message to the node specified in the `toNode` field. The `method` is what defines what the event will do. The preconfigured methods are: @@ -263,7 +263,7 @@ NB: Both the keys and the values used are case sensitive. #### Sending a command from one Node to Another Node -Example JSON for appending a message of type command into the `steward.sock` file +Example JSON for appending a message of type command into the `msg.pipe` file ```json [ @@ -317,11 +317,11 @@ To send a message with custom timeout and amount of retries ] ``` -You can save the content to myfile.JSON and append it to `steward.sock` +You can save the content to myfile.JSON and append it to `msg.pipe` -`cat myfile.json >> steward.sock` +`cat myfile.json >> msg.pipe` -The content of `steward.sock` will be erased as messages a processed. +The content of `msg.pipe` will be erased as messages a processed. #### Sending a message of type Event @@ -336,11 +336,11 @@ The content of `steward.sock` will be erased as messages a processed. ] ``` -You can save the content to myfile.JSON and append it to `steward.sock` +You can save the content to myfile.JSON and append it to `msg.pipe` -`cat myfile.json >> steward.sock` +`cat myfile.json >> msg.pipe` -The content of `steward.sock` will be erased as messages a processed. +The content of `msg.pipe` will be erased as messages a processed. ## Concepts/Ideas @@ -369,7 +369,7 @@ and for a shell command of type command to a host named "ship2" ## TODO -- FIX so it can handle multiple slices of input for steward.sock +- FIX so it can handle multiple slices of input for msg.pipe - Make a scraper that first send an EventACK, and the content of the scraping is returned by a node as a new EventACK back the where the initial event originated. diff --git a/example/toShip1-CLICommandRequest1.json b/example/toShip1-CLICommandRequest1.json index e8a67d5..129851a 100644 --- a/example/toShip1-CLICommandRequest1.json +++ b/example/toShip1-CLICommandRequest1.json @@ -2,7 +2,7 @@ { "toNode": "ship1", - "data": ["bash","-c","sleep 3 & echo 'apekatt'"], + "data": ["bash","-c","sleep 3 & tree ./"], "method":"CLICommandRequest", "timeout":10, "retries":3, diff --git a/go.mod b/go.mod index 30cf3a0..ea136d0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/RaaLabs/steward go 1.15 require ( - github.com/fsnotify/fsnotify v1.4.9 github.com/nats-io/nats-server/v2 v2.1.9 // indirect github.com/nats-io/nats.go v1.10.0 github.com/pelletier/go-toml v1.8.1 diff --git a/go.sum b/go.sum index 1e2d6a9..fcced13 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -342,7 +340,6 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/process.go b/process.go index 09fd16e..e9f32f1 100644 --- a/process.go +++ b/process.go @@ -55,12 +55,12 @@ type process struct { // copy of the configuration from server configuration *Configuration // The new messages channel copied from *Server - newMessagesCh chan<- []subjectAndMessage + toRingbufferCh chan<- []subjectAndMessage } // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { +func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -81,7 +81,7 @@ func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage, processKind: processKind, allowedReceivers: m, methodsAvailable: method.GetMethodsAvailable(), - newMessagesCh: newMessagesCh, + toRingbufferCh: toRingbufferCh, configuration: configuration, } @@ -135,7 +135,7 @@ func (p process) spawnWorker(s *server) { err := p.procFunc() if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) - sendErrorLogMessage(p.newMessagesCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) } }() } @@ -155,7 +155,7 @@ func (p process) spawnWorker(s *server) { err := p.procFunc() if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) - sendErrorLogMessage(p.newMessagesCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) } }() } @@ -174,7 +174,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { dataPayload, err := gobEncodeMessage(message) if err != nil { er := fmt.Errorf("error: createDataPayload: %v", err) - sendErrorLogMessage(p.newMessagesCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) continue } @@ -195,7 +195,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { subReply, err := natsConn.SubscribeSync(msg.Reply) if err != nil { er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err) - sendErrorLogMessage(p.newMessagesCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) continue } @@ -203,7 +203,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { err = natsConn.PublishMsg(msg) if err != nil { er := fmt.Errorf("error: publish failed: %v", err) - sendErrorLogMessage(p.newMessagesCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) continue } @@ -217,7 +217,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) if err != nil { er := fmt.Errorf("error: subReply.NextMsg failed for node=%v, subject=%v: %v", p.node, p.subject.name(), err) - sendErrorLogMessage(p.newMessagesCh, message.FromNode, er) + sendErrorLogMessage(p.toRingbufferCh, message.FromNode, er) // did not receive a reply, decide what to do.. retryAttempts++ @@ -229,7 +229,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { case retryAttempts >= message.Retries: // max retries reached er := fmt.Errorf("info: max retries for message reached, breaking out: %v", message) - sendErrorLogMessage(p.newMessagesCh, message.FromNode, er) + sendErrorLogMessage(p.toRingbufferCh, message.FromNode, er) return default: @@ -264,7 +264,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed: %v", err) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } switch { @@ -272,7 +272,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } out := []byte("not allowed from " + message.FromNode) @@ -291,11 +291,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } // Send a confirmation message back to the publisher @@ -305,7 +305,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } // Check if we are allowed to receive from that host @@ -325,16 +325,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } // --- default: er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) + sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) } } diff --git a/read_socket.go b/read_socket.go index 77303be..bfcea5d 100644 --- a/read_socket.go +++ b/read_socket.go @@ -1,57 +1,65 @@ package steward import ( - "bufio" + "bytes" "encoding/json" "fmt" - "io" "log" + "net" "os" - - "github.com/fsnotify/fsnotify" ) -// getMessagesFromFile will start a file watcher for the given directory -// and filename. It will take a channel of []byte as input, and it is -// in this channel the content of a file that has changed is returned. -func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, inputFromFileCh chan []subjectAndMessage) { - fileUpdated := make(chan bool) - go fileWatcherStart(directoryToCheck, fileUpdated) +// readSocket will read the .sock file specified. +// It will take a channel of []byte as input, and it is in this +// channel the content of a file that has changed is returned. +func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { + err := os.Remove("steward.sock") + if err != nil { + log.Printf("error: could not delete sock file: %v\n", err) + } - for range fileUpdated { + l, err := net.Listen("unix", "steward.sock") + if err != nil { + log.Printf("error: failed to open socket: %v\n", err) + os.Exit(1) + } - //load file, read it's content - b, err := readTruncateMessageFile(fileName) + // Loop, and wait for new connections. + for { + conn, err := l.Accept() if err != nil { - log.Printf("error: reading file: %v", err) + er := fmt.Errorf("error: failed to accept conn on socket: %v", err) + sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) } - // Start on top again if the file did not contain - // any data. - if len(b) == 0 { + b := make([]byte, 65535) + _, err = conn.Read(b) + if err != nil { + er := fmt.Errorf("error: failed to read data from socket: %v", err) + sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) continue } + b = bytes.Trim(b, "\x00") + // unmarshal the JSON into a struct - js, err := jsonFromFileData(b) + sam, err := convertBytesToSAM(b) if err != nil { er := fmt.Errorf("error: malformed json: %v", err) - sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) + sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) continue } - for i := range js { - fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent) + for i := range sam { + // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - js[i].Message.FromNode = node(s.nodeName) + sam[i].Message.FromNode = node(s.nodeName) } - // Send the data back to be consumed - inputFromFileCh <- js + // Send the SAM struct to be picked up by the ring buffer. + toRingbufferCh <- sam } - er := fmt.Errorf("error: getMessagesFromFile stopped") - sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) } type subjectAndMessage struct { @@ -59,10 +67,11 @@ type subjectAndMessage struct { Message `json:"message" yaml:"message"` } -// jsonFromFileData will range over the message given in json format. For -// each element found the Message type will be converted into a SubjectAndMessage -// type value and appended to a slice, and the slice is returned to the caller. -func jsonFromFileData(b []byte) ([]subjectAndMessage, error) { +// convertBytesToSAM will range over the byte representing a message given in +// json format. For each element found the Message type will be converted into +// a SubjectAndMessage type value and appended to a slice, and the slice is +// returned to the caller. +func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) { MsgSlice := []Message{} err := json.Unmarshal(b, &MsgSlice) @@ -114,72 +123,3 @@ func newSAM(m Message) (subjectAndMessage, error) { return sm, nil } - -// readTruncateMessageFile, will read all the messages in the given -// file, and truncate the file after read. -// A []byte will be returned with the content read. -func readTruncateMessageFile(fileName string) ([]byte, error) { - - f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend) - if err != nil { - log.Printf("error: readTruncateMessageFile: Failed to open file: %v\n", err) - return nil, err - } - defer f.Close() - - scanner := bufio.NewScanner(f) - - lines := []byte{} - - for scanner.Scan() { - lines = append(lines, scanner.Bytes()...) - } - - // empty the file after all is read - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return nil, fmt.Errorf("f.Seek failed: %v", err) - } - - err = f.Truncate(0) - if err != nil { - return nil, fmt.Errorf("f.Truncate failed: %v", err) - } - - return lines, nil -} - -// Start the file watcher that will check if the in pipe for new operator -// messages are updated with new content. -func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Println("Failed fsnotify.NewWatcher") - return - } - defer watcher.Close() - - done := make(chan bool) - go func() { - //Give a true value to updated so it reads the file the first time. - fileUpdated <- true - for { - select { - case event := <-watcher.Events: - if event.Op&fsnotify.Write == fsnotify.Write { - // log.Println("info: steward.sock file updated, processing input: ", event.Name) - //testing with an update chan to get updates - fileUpdated <- true - } - case err := <-watcher.Errors: - log.Println("error:", err) - } - } - }() - - err = watcher.Add(directoryToCheck) - if err != nil { - log.Printf("error: watcher add: %v\n", err) - } - <-done -} diff --git a/ringbuffer.go b/ringbuffer.go index a7d29ca..475b8c3 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -109,7 +109,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri } // Check for incomming messages. These are typically comming from - // the go routine who reads steward.sock. + // the go routine who reads msg.pipe. for v := range inCh { // Check if the command or event exists in commandOrEvent.go diff --git a/server.go b/server.go index c635b3e..cbcbfde 100644 --- a/server.go +++ b/server.go @@ -51,7 +51,7 @@ type server struct { // The name of the node nodeName string // Mutex for locking when writing to the process map - newMessagesCh chan []subjectAndMessage + toRingbufferCh chan []subjectAndMessage // errorKernel is doing all the error handling like what to do if // an error occurs. errorKernel *errorKernel @@ -67,12 +67,12 @@ func NewServer(c *Configuration) (*server, error) { } s := &server{ - configuration: c, - nodeName: c.NodeName, - natsConn: conn, - processes: newProcesses(), - newMessagesCh: make(chan []subjectAndMessage), - metrics: newMetrics(c.PromHostAndPort), + configuration: c, + nodeName: c.NodeName, + natsConn: conn, + processes: newProcesses(), + toRingbufferCh: make(chan []subjectAndMessage), + metrics: newMetrics(c.PromHostAndPort), } // Create the default data folder for where subscribers should @@ -101,19 +101,13 @@ func (s *server) Start() { // Start the error kernel that will do all the error handling // not done within a process. s.errorKernel = newErrorKernel() - s.errorKernel.startErrorKernel(s.newMessagesCh) + s.errorKernel.startErrorKernel(s.toRingbufferCh) // Start collecting the metrics go s.startMetrics() - // Start the checking the input file for new messages from operator. - go s.getMessagesFromFile("./", "steward.sock", s.newMessagesCh) - - // // if enabled, start the sayHello I'm here service at the given interval - // // REMOVED: - // if s.publisherServices.sayHelloPublisher.interval != 0 { - // go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName)) - // } + // Start the checking the input socket for new messages from operator. + go s.readSocket(s.toRingbufferCh) // Start up the predefined subscribers. s.ProcessesStart() @@ -122,7 +116,7 @@ func (s *server) Start() { s.printProcessesMap() // Start the processing of new messages from an input channel. - s.routeMessagesToProcess("./incommmingBuffer.db", s.newMessagesCh) + s.routeMessagesToProcess("./incommmingBuffer.db", s.toRingbufferCh) select {} @@ -192,7 +186,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) { // Prepare and start a new ring buffer const bufferSize int = 1000 - rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.newMessagesCh) + rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh) inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValue) // start the ringbuffer. @@ -225,12 +219,12 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject // Check if the format of the message is correct. if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) + sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er) continue } if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method) - sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) + sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er) continue } @@ -264,7 +258,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) // fmt.Printf("*** %#v\n", proc) proc.spawnWorker(s) diff --git a/startup_processes.go b/startup_processes.go index 01cfce7..8923849 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -17,7 +17,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommand, CommandACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -28,7 +28,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName) sub := newSubject(TextLogging, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubTextLogging.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubTextLogging.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -39,7 +39,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubSayHello.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubSayHello.Values, nil) proc.procFuncCh = make(chan Message) proc.procFunc = func() error { @@ -70,7 +70,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubErrorLog.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubErrorLog.Values, nil) go proc.spawnWorker(s) } } @@ -80,7 +80,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName) sub := newSubject(ECHORequest, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoRequest.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoRequest.Values, nil) go proc.spawnWorker(s) } } @@ -90,7 +90,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName) sub := newSubject(ECHOReply, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoReply.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoReply.Values, nil) go proc.spawnWorker(s) } } @@ -100,7 +100,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommandRequest, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequest.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequest.Values, nil) go proc.spawnWorker(s) } } @@ -110,7 +110,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommandRequestNOSEQ, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequestNOSEQ.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequestNOSEQ.Values, nil) go proc.spawnWorker(s) } } @@ -120,7 +120,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommandReply, EventACK, s.nodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandReply.Values, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandReply.Values, nil) go proc.spawnWorker(s) } } @@ -135,7 +135,7 @@ func (s *server) ProcessesStart() { fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -157,7 +157,7 @@ func (s *server) ProcessesStart() { // In theory the system should drop the message before it reaches here. log.Printf("error: ProcessesStart: %v\n", err) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.toRingbufferCh <- []subjectAndMessage{sam} time.Sleep(time.Second * time.Duration(s.configuration.StartPubSayHello)) } }) diff --git a/steward.sock b/steward.sock deleted file mode 100644 index e69de29..0000000 diff --git a/subscriber_method_types.go b/subscriber_method_types.go index fd683e1..d21fdf0 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -223,7 +223,7 @@ func (m methodCLICommand) handler(proc process, message Message, node string) ([ case <-ctx.Done(): cancel() er := fmt.Errorf("error: method timed out %v", message) - sendErrorLogMessage(proc.newMessagesCh, proc.node, er) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) case out = <-outCh: cancel() } @@ -334,7 +334,7 @@ func (m methodEchoRequest) handler(proc process, message Message, node string) ( // In theory the system should drop the message before it reaches here. log.Printf("error: methodEchoRequest: %v\n", err) } - proc.newMessagesCh <- []subjectAndMessage{nSAM} + proc.toRingbufferCh <- []subjectAndMessage{nSAM} ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil @@ -398,7 +398,7 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str case <-ctx.Done(): cancel() er := fmt.Errorf("error: method timed out %v", message) - sendErrorLogMessage(proc.newMessagesCh, proc.node, er) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) case out := <-outCh: cancel() @@ -418,7 +418,7 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str // In theory the system should drop the message before it reaches here. log.Printf("error: methodCLICommandRequest: %v\n", err) } - proc.newMessagesCh <- []subjectAndMessage{nSAM} + proc.toRingbufferCh <- []subjectAndMessage{nSAM} } }() @@ -471,7 +471,7 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod case <-ctx.Done(): cancel() er := fmt.Errorf("error: method timed out %v", message) - sendErrorLogMessage(proc.newMessagesCh, proc.node, er) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) case out := <-outCh: cancel() @@ -491,7 +491,7 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod // In theory the system should drop the message before it reaches here. log.Printf("error: methodCLICommandRequest: %v\n", err) } - proc.newMessagesCh <- []subjectAndMessage{nSAM} + proc.toRingbufferCh <- []subjectAndMessage{nSAM} } }()