1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 12:59:15 +00:00
ctrl/server.go

413 lines
13 KiB
Go
Raw Normal View History

// Notes:
2021-02-01 10:13:38 +00:00
package steward
2021-01-25 14:23:00 +00:00
import (
"context"
2021-01-25 14:23:00 +00:00
"fmt"
"log"
2021-03-30 08:37:16 +00:00
"net"
"os"
2021-05-12 07:50:03 +00:00
"path/filepath"
"strings"
2021-01-25 14:23:00 +00:00
"time"
"github.com/nats-io/nats.go"
)
2021-02-26 06:55:28 +00:00
type processName string
// Will return a process name made up of subjectName+processKind
2021-02-26 06:55:28 +00:00
func processNameGet(sn subjectName, pk processKind) processName {
pn := fmt.Sprintf("%s_%s", sn, pk)
return processName(pn)
}
// server is the structure that will hold the state about spawned
// processes on a local instance.
2021-01-27 13:02:57 +00:00
type server struct {
// The main background context
ctx context.Context
// The CancelFunc for the main context
2021-08-11 10:23:37 +00:00
cancel context.CancelFunc
// Configuration options used for running the server
configuration *Configuration
// The nats connection to the broker
2021-01-27 13:02:57 +00:00
natsConn *nats.Conn
2021-07-02 06:38:44 +00:00
// net listener for communicating via the steward socket
2021-08-09 12:41:31 +00:00
StewardSocket net.Listener
2021-07-02 06:38:44 +00:00
// net listener for the communication with Stew
2021-08-09 12:41:31 +00:00
StewSocket net.Listener
// processes holds all the information about running processes
processes *processes
// The name of the node
nodeName string
2021-02-24 09:58:02 +00:00
// Mutex for locking when writing to the process map
toRingbufferCh chan []subjectAndMessage
2021-02-24 09:58:02 +00:00
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
2021-02-18 11:29:14 +00:00
// metric exporter
metrics *metrics
2021-01-27 13:02:57 +00:00
}
// newServer will prepare and return a server type
func NewServer(c *Configuration) (*server, error) {
2021-08-11 10:23:37 +00:00
// Set up the main background context.
ctx, cancel := context.WithCancel(context.Background())
2021-04-19 19:06:37 +00:00
var opt nats.Option
2021-08-16 11:01:12 +00:00
2021-04-19 19:06:37 +00:00
if c.RootCAPath != "" {
opt = nats.RootCAs(c.RootCAPath)
}
2021-05-20 10:27:25 +00:00
if c.NkeySeedFile != "" {
var err error
2021-08-11 10:23:37 +00:00
2021-05-20 10:27:25 +00:00
opt, err = nats.NkeyOptionFromSeed(c.NkeySeedFile)
if err != nil {
cancel()
2021-05-20 10:27:25 +00:00
return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err)
}
}
var conn *nats.Conn
const connRetryWait = 5
2021-08-16 11:01:12 +00:00
// Connect to the nats server, and retry until succesful.
for {
var err error
// Setting MaxReconnects to -1 which equals unlimited.
conn, err = nats.Connect(c.BrokerAddress, opt, nats.MaxReconnects(-1))
// Nats use string types for errors, so we need to check the content of the error.
// If no servers where available, we loop and retry until succesful.
if err != nil {
if strings.Contains(err.Error(), "no servers available") {
log.Printf("error: could not connect, waiting 5 seconds, and retrying: %v\n", err)
time.Sleep(time.Duration(time.Second * connRetryWait))
continue
}
er := fmt.Errorf("error: nats.Connect failed: %v", err)
cancel()
return nil, er
}
2021-02-01 10:13:38 +00:00
break
}
2021-08-11 10:23:37 +00:00
2021-07-02 06:38:44 +00:00
// Prepare the connection to the Steward socket file
2021-05-12 07:50:03 +00:00
// Check if socket folder exists, if not create it
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
err := os.MkdirAll(c.SocketFolder, 0700)
if err != nil {
cancel()
2021-05-20 10:27:25 +00:00
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
2021-05-12 07:50:03 +00:00
}
}
2021-08-11 10:23:37 +00:00
// Just as an extra check we eventually delete any existing Steward socket files if found.
2021-05-12 07:50:03 +00:00
socketFilepath := filepath.Join(c.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
cancel()
2021-05-12 07:50:03 +00:00
return nil, er
}
2021-03-30 08:37:16 +00:00
}
2021-08-11 10:23:37 +00:00
// Open the socket.
2021-05-12 07:50:03 +00:00
nl, err := net.Listen("unix", socketFilepath)
2021-03-30 08:37:16 +00:00
if err != nil {
2021-04-16 11:18:10 +00:00
er := fmt.Errorf("error: failed to open socket: %v", err)
cancel()
2021-04-16 11:18:10 +00:00
return nil, er
2021-03-30 08:37:16 +00:00
}
2021-07-02 06:38:44 +00:00
// ---
// Prepare the connection to the Stew socket file
// Check if socket folder exists, if not create it
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
err := os.MkdirAll(c.SocketFolder, 0700)
if err != nil {
cancel()
2021-07-02 06:38:44 +00:00
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
}
}
stewSocketFilepath := filepath.Join(c.SocketFolder, "stew.sock")
2021-08-11 10:23:37 +00:00
// Just as an extra check we eventually delete any existing Stew socket files if found.
2021-07-02 06:38:44 +00:00
if _, err := os.Stat(stewSocketFilepath); !os.IsNotExist(err) {
err = os.Remove(stewSocketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete stew.sock file: %v", err)
cancel()
2021-07-02 06:38:44 +00:00
return nil, er
}
}
stewNL, err := net.Listen("unix", stewSocketFilepath)
if err != nil {
er := fmt.Errorf("error: failed to open stew socket: %v", err)
cancel()
2021-07-02 06:38:44 +00:00
return nil, er
}
// ---
metrics := newMetrics(c.PromHostAndPort)
s := &server{
2021-08-09 12:41:31 +00:00
ctx: ctx,
2021-08-11 10:23:37 +00:00
cancel: cancel,
2021-08-09 12:41:31 +00:00
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
StewardSocket: nl,
StewSocket: stewNL,
2021-08-18 10:16:21 +00:00
processes: newProcesses(ctx, metrics),
2021-08-09 12:41:31 +00:00
toRingbufferCh: make(chan []subjectAndMessage),
metrics: metrics,
}
// Create the default data folder for where subscribers should
2021-03-25 11:50:58 +00:00
// write it's data, check if data folder exist, and create it if needed.
if _, err := os.Stat(c.SubscribersDataFolder); os.IsNotExist(err) {
if c.SubscribersDataFolder == "" {
return nil, fmt.Errorf("error: subscribersDataFolder value is empty, you need to provide the config or the flag value at startup %v: %v", c.SubscribersDataFolder, err)
}
err := os.Mkdir(c.SubscribersDataFolder, 0700)
if err != nil {
2021-05-20 10:27:25 +00:00
return nil, fmt.Errorf("error: failed to create data folder directory %v: %v", c.SubscribersDataFolder, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", c.SubscribersDataFolder)
}
2021-02-05 06:25:12 +00:00
return s, nil
}
2021-02-24 09:58:02 +00:00
// Start will spawn up all the predefined subscriber processes.
2021-02-10 06:25:44 +00:00
// Spawning of publisher processes is done on the fly by checking
2021-02-24 09:58:02 +00:00
// if there is publisher process for a given message subject, and
2021-08-16 11:01:12 +00:00
// if it does not exist it will spawn one.
func (s *server) Start() {
// Start the error kernel that will do all the error handling
2021-08-03 11:57:29 +00:00
// that is not done within a process.
2021-08-04 08:37:24 +00:00
s.errorKernel = newErrorKernel(s.ctx)
2021-08-04 08:37:24 +00:00
go func() {
err := s.errorKernel.start(s.toRingbufferCh)
if err != nil {
log.Printf("%v\n", err)
}
}()
2021-02-18 11:29:14 +00:00
// Start collecting the metrics
2021-08-03 11:57:29 +00:00
go func() {
2021-08-04 08:37:24 +00:00
err := s.metrics.start()
2021-08-03 11:57:29 +00:00
if err != nil {
log.Printf("%v\n", err)
os.Exit(1)
}
}()
2021-02-18 11:29:14 +00:00
// Start the checking the input socket for new messages from operator.
go s.readSocket(s.toRingbufferCh)
// Start up the predefined subscribers.
//
// Since all the logic to handle processes are tied to the process
// struct, we need to create an initial process to start the rest.
2021-08-11 10:23:37 +00:00
//
// NB: The context of the initial process are set in processes.Start.
2021-08-09 07:18:30 +00:00
sub := newSubject(REQInitial, s.nodeName)
2021-08-18 10:16:21 +00:00
p := newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
// Start all wanted subscriber processes.
s.processes.Start(p)
2021-08-09 07:18:30 +00:00
time.Sleep(time.Second * 1)
s.processes.printProcessesMap()
2021-08-09 07:18:30 +00:00
// Start the processing of new messages from an input channel.
s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh)
2021-02-10 06:25:44 +00:00
2021-08-09 07:18:30 +00:00
}
2021-08-09 07:18:30 +00:00
// Will stop all processes started during startup.
func (s *server) Stop() {
// Stop the started pub/sub message processes.
2021-08-11 10:23:37 +00:00
s.processes.Stop()
2021-08-09 07:18:30 +00:00
log.Printf("info: stopped all subscribers\n")
2021-07-02 06:38:44 +00:00
2021-08-09 07:18:30 +00:00
// Stop the errorKernel.
s.errorKernel.stop()
log.Printf("info: stopped the errorKernel\n")
2021-07-02 11:26:30 +00:00
2021-08-09 07:18:30 +00:00
// Stop the main context.
2021-08-11 10:23:37 +00:00
s.cancel()
2021-08-09 07:18:30 +00:00
log.Printf("info: stopped the main context\n")
2021-08-09 12:41:31 +00:00
// Delete the socket file when the program exits.
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
log.Printf("%v\n", er)
}
}
}
2021-02-25 10:08:05 +00:00
// sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
2021-06-29 06:21:42 +00:00
func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
2021-03-26 04:13:51 +00:00
// NB: Adding log statement here for more visuality during development.
log.Printf("%v\n", theError)
2021-02-25 10:08:05 +00:00
sam := createErrorMsgContent(FromNode, theError)
newMessagesCh <- []subjectAndMessage{sam}
}
2021-02-25 10:08:05 +00:00
// createErrorMsgContent will prepare a subject and message with the content
// of the error
2021-06-29 06:21:42 +00:00
func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
// Add time stamp
er := fmt.Sprintf("%v, %v\n", time.Now().UTC(), theError.Error())
sam := subjectAndMessage{
2021-04-06 05:56:49 +00:00
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: FromNode,
FileExtension: ".log",
Data: []string{er},
Method: REQErrorLog,
},
}
2021-02-25 10:08:05 +00:00
return sam
}
2021-03-09 06:43:55 +00:00
// Contains the sam value as it is used in the state DB, and also a
// delivered function to be called when this message is picked up, so
// we can control if messages gets stale at some point.
type samDBValueAndDelivered struct {
2021-07-02 16:32:01 +00:00
samDBValue samDBValue
delivered func()
2021-07-02 16:32:01 +00:00
}
2021-03-10 06:11:14 +00:00
// routeMessagesToProcess takes a database name and an input channel as
2021-03-09 06:43:55 +00:00
// it's input arguments.
// The database will be used as the persistent store for the work queue
// which is implemented as a ring buffer.
// The input channel are where we read new messages to publish.
// Incomming messages will be routed to the correct subject process, where
// the handling of each nats subject is handled within it's own separate
// worker process.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
2021-03-10 06:11:14 +00:00
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
2021-03-09 06:43:55 +00:00
// Prepare and start a new ring buffer
const bufferSize int = 1000
2021-06-29 06:21:42 +00:00
rb := newringBuffer(*s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh)
2021-03-09 06:43:55 +00:00
inCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValueAndDelivered)
2021-03-09 06:43:55 +00:00
// start the ringbuffer.
rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries)
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.
go func() {
for samSlice := range newSAM {
for _, sam := range samSlice {
inCh <- sam
}
}
close(inCh)
}()
// Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and if no subject
// exist throw an error.
var coe CommandOrEvent
coeAvailable := coe.GetCommandOrEventAvailable()
var method Method
methodsAvailable := method.GetMethodsAvailable()
go func() {
for samTmp := range ringBufferOutCh {
samTmp.delivered()
sam := samTmp.samDBValue.Data
2021-03-09 06:43:55 +00:00
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
2021-06-29 06:21:42 +00:00
sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er)
2021-03-09 06:43:55 +00:00
continue
}
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
2021-06-29 06:21:42 +00:00
sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er)
2021-03-09 06:43:55 +00:00
continue
}
redo:
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present. The process will then be created, and
// the code will loop back to the redo: label.
m := sam.Message
subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet(subjName, processKindPublisher)
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
s.processes.mu.Lock()
existingProcIDMap, ok := s.processes.active[pn]
s.processes.mu.Unlock()
2021-03-09 06:43:55 +00:00
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
2021-03-09 06:43:55 +00:00
if ok {
s.processes.mu.Lock()
for _, existingProc := range existingProcIDMap {
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
existingProc.subject.messageCh <- m
}
s.processes.mu.Unlock()
2021-03-09 06:43:55 +00:00
// If no process to handle the specific subject exist,
// the we create and spawn one.
} else {
// If a publisher process do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
2021-08-18 10:16:21 +00:00
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
2021-03-09 06:43:55 +00:00
// fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s.processes, s.natsConn)
2021-03-09 06:43:55 +00:00
// Now when the process is spawned we jump back to the redo: label,
// and send the message to that new process.
goto redo
}
}
}()
}