From 4929191269f78f0fb5c6797730ae12de47b0cfb7 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 2 Jul 2021 08:38:44 +0200 Subject: [PATCH] added initital code for stew socket --- read_socket.go | 7 ++++- server.go | 78 +++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/read_socket.go b/read_socket.go index 64dadd4..44e8e82 100644 --- a/read_socket.go +++ b/read_socket.go @@ -14,7 +14,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { // Loop, and wait for new connections. for { - conn, err := s.netListener.Accept() + conn, err := s.StewardSockListener.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) @@ -52,6 +52,11 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { } } +// TODO: Create the writer go routine for this socket. +func (s *server) writeStewSocket(toStewSocketCh []byte) { + //s.StewSockListener +} + type subjectAndMessage struct { Subject `json:"subject" yaml:"subject"` Message `json:"message" yaml:"message"` diff --git a/server.go b/server.go index 1564cda..24cc122 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "os/signal" "path/filepath" "strings" "sync" @@ -65,8 +66,10 @@ type server struct { configuration *Configuration // The nats connection to the broker natsConn *nats.Conn - // net listener for communicating via the socket - netListener net.Listener + // net listener for communicating via the steward socket + StewardSockListener net.Listener + // net listener for the communication with Stew + StewSockListener net.Listener // processes holds all the information about running processes processes *processes // The name of the node @@ -127,7 +130,7 @@ func NewServer(c *Configuration) (*server, error) { break } - // Prepare the connection to the socket file + // Prepare the connection to the Steward socket file // Check if socket folder exists, if not create it if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) { @@ -153,16 +156,47 @@ func NewServer(c *Configuration) (*server, error) { return nil, er } + // --- + + // Prepare the connection to the Stew socket file + + // Check if socket folder exists, if not create it + if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) { + err := os.MkdirAll(c.SocketFolder, 0700) + if err != nil { + return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err) + } + } + + stewSocketFilepath := filepath.Join(c.SocketFolder, "stew.sock") + + if _, err := os.Stat(stewSocketFilepath); !os.IsNotExist(err) { + err = os.Remove(stewSocketFilepath) + if err != nil { + er := fmt.Errorf("error: could not delete stew.sock file: %v", err) + return nil, er + } + } + + stewNL, err := net.Listen("unix", stewSocketFilepath) + if err != nil { + er := fmt.Errorf("error: failed to open stew socket: %v", err) + return nil, er + } + + // --- + metrics := newMetrics(c.PromHostAndPort) s := &server{ - configuration: c, - nodeName: c.NodeName, - natsConn: conn, - netListener: nl, - processes: newProcesses(metrics.promRegistry), - toRingbufferCh: make(chan []subjectAndMessage), - metrics: metrics, + configuration: c, + nodeName: c.NodeName, + natsConn: conn, + StewardSockListener: nl, + StewSockListener: stewNL, + processes: newProcesses(metrics.promRegistry), + toRingbufferCh: make(chan []subjectAndMessage), + metrics: metrics, } // Create the default data folder for where subscribers should @@ -199,6 +233,19 @@ func (s *server) Start() { // Start the checking the input socket for new messages from operator. go s.readSocket(s.toRingbufferCh) + // Delete the socket file when the program exits. + defer func() { + socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock") + + if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) { + err = os.Remove(socketFilepath) + if err != nil { + er := fmt.Errorf("error: could not delete sock file: %v", err) + log.Printf("%v\n", er) + } + } + }() + // Start up the predefined subscribers. Since all the logic to handle // processes are tied to the process struct, we need to create an // initial process to start the rest. @@ -212,7 +259,16 @@ func (s *server) Start() { // Start the processing of new messages from an input channel. s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh) - select {} + // Set up channel on which to send signal notifications. + // We must use a buffered channel or risk missing the signal + // if we're not ready to receive when the signal is sent. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + //Block until we receive a signal + sig := <-sigCh + fmt.Printf("Got exit signal, terminating all processes, %v\n", sig) + return }