1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

added initital code for stew socket

This commit is contained in:
postmannen 2021-07-02 08:38:44 +02:00
parent 00b439d6d2
commit 4929191269
2 changed files with 73 additions and 12 deletions

View file

@ -14,7 +14,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
// Loop, and wait for new connections.
for {
conn, err := s.netListener.Accept()
conn, err := s.StewardSockListener.Accept()
if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
@ -52,6 +52,11 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
}
}
// TODO: Create the writer go routine for this socket.
func (s *server) writeStewSocket(toStewSocketCh []byte) {
//s.StewSockListener
}
type subjectAndMessage struct {
Subject `json:"subject" yaml:"subject"`
Message `json:"message" yaml:"message"`

View file

@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
@ -65,8 +66,10 @@ type server struct {
configuration *Configuration
// The nats connection to the broker
natsConn *nats.Conn
// net listener for communicating via the socket
netListener net.Listener
// net listener for communicating via the steward socket
StewardSockListener net.Listener
// net listener for the communication with Stew
StewSockListener net.Listener
// processes holds all the information about running processes
processes *processes
// The name of the node
@ -127,7 +130,7 @@ func NewServer(c *Configuration) (*server, error) {
break
}
// Prepare the connection to the socket file
// Prepare the connection to the Steward socket file
// Check if socket folder exists, if not create it
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
@ -153,16 +156,47 @@ func NewServer(c *Configuration) (*server, error) {
return nil, er
}
// ---
// Prepare the connection to the Stew socket file
// Check if socket folder exists, if not create it
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
err := os.MkdirAll(c.SocketFolder, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
}
}
stewSocketFilepath := filepath.Join(c.SocketFolder, "stew.sock")
if _, err := os.Stat(stewSocketFilepath); !os.IsNotExist(err) {
err = os.Remove(stewSocketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete stew.sock file: %v", err)
return nil, er
}
}
stewNL, err := net.Listen("unix", stewSocketFilepath)
if err != nil {
er := fmt.Errorf("error: failed to open stew socket: %v", err)
return nil, er
}
// ---
metrics := newMetrics(c.PromHostAndPort)
s := &server{
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
netListener: nl,
processes: newProcesses(metrics.promRegistry),
toRingbufferCh: make(chan []subjectAndMessage),
metrics: metrics,
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
StewardSockListener: nl,
StewSockListener: stewNL,
processes: newProcesses(metrics.promRegistry),
toRingbufferCh: make(chan []subjectAndMessage),
metrics: metrics,
}
// Create the default data folder for where subscribers should
@ -199,6 +233,19 @@ func (s *server) Start() {
// Start the checking the input socket for new messages from operator.
go s.readSocket(s.toRingbufferCh)
// Delete the socket file when the program exits.
defer func() {
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
log.Printf("%v\n", er)
}
}
}()
// Start up the predefined subscribers. Since all the logic to handle
// processes are tied to the process struct, we need to create an
// initial process to start the rest.
@ -212,7 +259,16 @@ func (s *server) Start() {
// Start the processing of new messages from an input channel.
s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh)
select {}
// Set up channel on which to send signal notifications.
// We must use a buffered channel or risk missing the signal
// if we're not ready to receive when the signal is sent.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
//Block until we receive a signal
sig := <-sigCh
fmt.Printf("Got exit signal, terminating all processes, %v\n", sig)
return
}