1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00
ctrl/server.go

511 lines
16 KiB
Go
Raw Normal View History

// Notes:
2021-02-01 11:13:38 +01:00
package steward
2021-01-25 15:23:00 +01:00
import (
"context"
2021-01-25 15:23:00 +01:00
"fmt"
"log"
2021-03-30 10:37:16 +02:00
"net"
2021-08-23 12:47:33 +02:00
"net/http"
"os"
2021-05-12 09:50:03 +02:00
"path/filepath"
2021-01-25 15:23:00 +01:00
"time"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
2021-01-25 15:23:00 +01:00
)
2021-02-26 07:55:28 +01:00
type processName string
// Will return a process name made up of subjectName+processKind
2021-02-26 07:55:28 +01:00
func processNameGet(sn subjectName, pk processKind) processName {
pn := fmt.Sprintf("%s_%s", sn, pk)
return processName(pn)
}
// server is the structure that will hold the state about spawned
// processes on a local instance.
2021-01-27 14:02:57 +01:00
type server struct {
// The main background context
ctx context.Context
// The CancelFunc for the main context
2021-08-11 12:23:37 +02:00
cancel context.CancelFunc
// Configuration options used for running the server
configuration *Configuration
// The nats connection to the broker
2021-01-27 14:02:57 +01:00
natsConn *nats.Conn
2021-07-02 08:38:44 +02:00
// net listener for communicating via the steward socket
2021-08-09 14:41:31 +02:00
StewardSocket net.Listener
// processes holds all the information about running processes
processes *processes
// The name of the node
nodeName string
// ringBufferBulkInCh are the channel where new messages in a bulk
// format (slice) are put into the system.
2021-11-09 13:18:58 +01:00
//
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
ringBufferBulkInCh chan []subjectAndMessage
2021-02-24 10:58:02 +01:00
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
2021-09-15 08:39:34 +02:00
// Ring buffer
ringBuffer *ringBuffer
2021-02-18 12:29:14 +01:00
// metric exporter
metrics *metrics
// Version of package
version string
2022-01-08 04:19:51 +01:00
// tui client
tui *tui
// processInitial is the initial process that all other processes are tied to.
processInitial process
2021-01-27 14:02:57 +01:00
}
// newServer will prepare and return a server type
func NewServer(c *Configuration, version string) (*server, error) {
2021-08-11 12:23:37 +02:00
// Set up the main background context.
ctx, cancel := context.WithCancel(context.Background())
metrics := newMetrics(c.PromHostAndPort)
// Start the error kernel that will do all the error handling
// that is not done within a process.
errorKernel := newErrorKernel(ctx, metrics)
2021-04-19 21:06:37 +02:00
var opt nats.Option
2021-08-16 13:01:12 +02:00
2021-04-19 21:06:37 +02:00
if c.RootCAPath != "" {
opt = nats.RootCAs(c.RootCAPath)
}
2021-05-20 12:27:25 +02:00
if c.NkeySeedFile != "" {
var err error
2021-08-11 12:23:37 +02:00
2021-05-20 12:27:25 +02:00
opt, err = nats.NkeyOptionFromSeed(c.NkeySeedFile)
if err != nil {
cancel()
2021-05-20 12:27:25 +02:00
return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err)
}
}
var conn *nats.Conn
2021-08-16 13:01:12 +02:00
// Connect to the nats server, and retry until succesful.
for {
var err error
// Setting MaxReconnects to -1 which equals unlimited.
2021-12-17 17:28:57 +01:00
conn, err = nats.Connect(c.BrokerAddress,
opt,
nats.MaxReconnects(-1),
nats.ReconnectJitter(time.Duration(c.NatsReconnectJitter)*time.Millisecond, time.Duration(c.NatsReconnectJitterTLS)*time.Second),
nats.Timeout(time.Second*time.Duration(c.NatsConnOptTimeout)),
)
// If no servers where available, we loop and retry until succesful.
if err != nil {
log.Printf("error: could not connect, waiting %v seconds, and retrying: %v\n", c.NatsConnectRetryInterval, err)
time.Sleep(time.Duration(time.Second * time.Duration(c.NatsConnectRetryInterval)))
continue
}
2021-02-01 11:13:38 +01:00
break
}
2021-08-11 12:23:37 +02:00
2021-12-16 11:01:01 +01:00
log.Printf(" * conn.Opts.ReconnectJitterTLS: %v\n", conn.Opts.ReconnectJitterTLS)
log.Printf(" * conn.Opts.ReconnectJitter: %v\n", conn.Opts.ReconnectJitter)
2022-01-07 07:36:19 +01:00
var stewardSocket net.Listener
var err error
// Open the steward socket file, and start the listener if enabled.
2022-01-07 07:36:19 +01:00
if c.EnableSocket {
stewardSocket, err = createSocket(c.SocketFolder, "steward.sock")
if err != nil {
cancel()
return nil, err
}
2021-03-30 10:37:16 +02:00
}
2022-01-08 04:19:51 +01:00
// Create the tui client structure if enabled.
var tuiClient *tui
if c.EnableTUI {
2022-01-13 08:34:22 +01:00
tuiClient, err = newTui(Node(c.NodeName))
2022-01-08 04:19:51 +01:00
if err != nil {
cancel()
return nil, err
}
}
s := &server{
ctx: ctx,
cancel: cancel,
configuration: c,
nodeName: c.NodeName,
natsConn: conn,
StewardSocket: stewardSocket,
processes: newProcesses(ctx, metrics, tuiClient, errorKernel),
ringBufferBulkInCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
tui: tuiClient,
errorKernel: errorKernel,
}
// Create the default data folder for where subscribers should
2021-03-25 12:50:58 +01:00
// write it's data, check if data folder exist, and create it if needed.
if _, err := os.Stat(c.SubscribersDataFolder); os.IsNotExist(err) {
if c.SubscribersDataFolder == "" {
return nil, fmt.Errorf("error: subscribersDataFolder value is empty, you need to provide the config or the flag value at startup %v: %v", c.SubscribersDataFolder, err)
}
err := os.Mkdir(c.SubscribersDataFolder, 0700)
if err != nil {
2021-05-20 12:27:25 +02:00
return nil, fmt.Errorf("error: failed to create data folder directory %v: %v", c.SubscribersDataFolder, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", c.SubscribersDataFolder)
}
2021-02-05 07:25:12 +01:00
return s, nil
}
// create socket will create a socket file, and return the net.Listener to
// communicate with that socket.
func createSocket(socketFolder string, socketFileName string) (net.Listener, error) {
// Check if socket folder exists, if not create it
if _, err := os.Stat(socketFolder); os.IsNotExist(err) {
err := os.MkdirAll(socketFolder, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", socketFolder, err)
}
}
// Just as an extra check we eventually delete any existing Steward socket files if found.
socketFilepath := filepath.Join(socketFolder, socketFileName)
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
return nil, er
}
}
// Open the socket.
nl, err := net.Listen("unix", socketFilepath)
if err != nil {
er := fmt.Errorf("error: failed to open socket: %v", err)
return nil, er
}
return nl, nil
}
2021-02-24 10:58:02 +01:00
// Start will spawn up all the predefined subscriber processes.
2021-02-10 07:25:44 +01:00
// Spawning of publisher processes is done on the fly by checking
2021-02-24 10:58:02 +01:00
// if there is publisher process for a given message subject, and
2021-08-16 13:01:12 +02:00
// if it does not exist it will spawn one.
func (s *server) Start() {
2021-09-23 05:46:25 +02:00
log.Printf("Starting steward, version=%+v\n", s.version)
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
2021-08-04 10:37:24 +02:00
go func() {
err := s.errorKernel.start(s.ringBufferBulkInCh)
2021-08-04 10:37:24 +02:00
if err != nil {
log.Printf("%v\n", err)
}
}()
2021-02-18 12:29:14 +01:00
// Start collecting the metrics
2021-08-03 13:57:29 +02:00
go func() {
2021-08-04 10:37:24 +02:00
err := s.metrics.start()
2021-08-03 13:57:29 +02:00
if err != nil {
log.Printf("%v\n", err)
os.Exit(1)
}
}()
2021-02-18 12:29:14 +01:00
// Start the checking the input socket for new messages from operator.
2022-01-07 07:36:19 +01:00
if s.configuration.EnableSocket {
go s.readSocket()
}
2021-09-10 05:26:16 +02:00
// Check if we should start the tcp listener for new messages from operator.
if s.configuration.TCPListener != "" {
go s.readTCPListener()
}
2021-09-10 05:26:16 +02:00
// Check if we should start the http listener for new messages from operator.
if s.configuration.HTTPListener != "" {
go s.readHttpListener()
}
// Start up the predefined subscribers.
//
// Since all the logic to handle processes are tied to the process
// struct, we need to create an initial process to start the rest.
2021-08-11 12:23:37 +02:00
//
// NB: The context of the initial process are set in processes.Start.
2021-08-09 09:18:30 +02:00
sub := newSubject(REQInitial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil)
// Start all wanted subscriber processes.
s.processes.Start(s.processInitial)
2021-08-09 09:18:30 +02:00
time.Sleep(time.Second * 1)
s.processes.printProcessesMap()
2021-08-23 12:47:33 +02:00
// Start exposing the the data folder via HTTP if flag is set.
if s.configuration.ExposeDataFolder != "" {
log.Printf("info: Starting expose of data folder via HTTP\n")
go s.exposeDataFolder(s.ctx)
}
2022-01-08 04:19:51 +01:00
if s.configuration.EnableTUI {
go func() {
err := s.tui.Start(s.ctx, s.ringBufferBulkInCh)
2022-01-08 04:19:51 +01:00
if err != nil {
log.Printf("%v\n", err)
os.Exit(1)
}
}()
}
2021-08-09 09:18:30 +02:00
// Start the processing of new messages from an input channel.
2021-09-15 08:39:34 +02:00
// NB: We might need to create a sub context for the ringbuffer here
// so we can cancel this context last, and not use the server.
2021-08-25 08:50:24 +02:00
s.routeMessagesToProcess("./incomingBuffer.db")
2021-02-10 07:25:44 +01:00
2022-01-26 09:23:02 +01:00
// Check and enable read the messages specified in the startup folder.
s.readStartupFolder()
2021-08-09 09:18:30 +02:00
}
2021-08-09 09:18:30 +02:00
// Will stop all processes started during startup.
func (s *server) Stop() {
// Stop the started pub/sub message processes.
2021-08-11 12:23:37 +02:00
s.processes.Stop()
2021-08-09 09:18:30 +02:00
log.Printf("info: stopped all subscribers\n")
2021-07-02 08:38:44 +02:00
2021-08-09 09:18:30 +02:00
// Stop the errorKernel.
s.errorKernel.stop()
log.Printf("info: stopped the errorKernel\n")
2021-07-02 13:26:30 +02:00
2021-08-09 09:18:30 +02:00
// Stop the main context.
2021-08-11 12:23:37 +02:00
s.cancel()
2021-08-09 09:18:30 +02:00
log.Printf("info: stopped the main context\n")
2021-08-09 14:41:31 +02:00
2022-01-05 19:53:26 +01:00
// Delete the steward socket file when the program exits.
2021-08-09 14:41:31 +02:00
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
log.Printf("%v\n", er)
}
}
}
2021-09-23 08:31:30 +02:00
// sendInfoMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
2022-01-21 06:15:26 +01:00
//
// DEPRECATED:
// func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// // NB: Adding log statement here for more visuality during development.
// log.Printf("%v\n", theError)
// sam := createErrorMsgContent(conf, FromNode, theError)
// ringBufferBulkInCh <- []subjectAndMessage{sam}
//
// metrics.promInfoMessagesSentTotal.Inc()
// }
2021-09-23 08:31:30 +02:00
2021-02-25 11:08:05 +01:00
// createErrorMsgContent will prepare a subject and message with the content
// of the error
func createErrorMsgContent(conf *Configuration, FromNode Node, theError error) subjectAndMessage {
// Add time stamp
2021-09-23 08:19:53 +02:00
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), FromNode, theError.Error())
sam := subjectAndMessage{
2021-04-06 07:56:49 +02:00
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: FromNode,
FileName: "error.log",
Data: []string{er},
Method: REQErrorLog,
ACKTimeout: conf.ErrorMessageTimeout,
Retries: conf.ErrorMessageRetries,
},
}
2021-02-25 11:08:05 +01:00
return sam
}
2021-03-09 07:43:55 +01:00
// Contains the sam value as it is used in the state DB, and also a
// delivered function to be called when this message is picked up, so
// we can control if messages gets stale at some point.
type samDBValueAndDelivered struct {
2021-07-02 18:32:01 +02:00
samDBValue samDBValue
delivered func()
2021-07-02 18:32:01 +02:00
}
2021-08-25 08:50:24 +02:00
// routeMessagesToProcess takes a database name it's input argument.
// The database will be used as the persistent k/v store for the work
// queue which is implemented as a ring buffer.
// The ringBufferInCh are where we get new messages to publish.
2021-03-09 07:43:55 +01:00
// Incomming messages will be routed to the correct subject process, where
// the handling of each nats subject is handled within it's own separate
// worker process.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
2021-08-25 08:50:24 +02:00
func (s *server) routeMessagesToProcess(dbFileName string) {
2021-03-09 07:43:55 +01:00
// Prepare and start a new ring buffer
2021-12-21 07:21:12 +01:00
var bufferSize int = s.configuration.RingBufferSize
const samValueBucket string = "samValueBucket"
const indexValueBucket string = "indexValueBucket"
s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.ringBufferBulkInCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial)
2021-08-26 07:02:36 +02:00
ringBufferInCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValueAndDelivered)
2021-03-09 07:43:55 +01:00
// start the ringbuffer.
s.ringBuffer.start(s.ctx, ringBufferInCh, ringBufferOutCh)
2021-03-09 07:43:55 +01:00
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.
2022-01-20 07:50:58 +01:00
// Since the new messages comming into the system is a []subjectAndMessage
// we loop here, unfold the slice, and put single subjectAndMessages's on
// the channel to the ringbuffer.
2021-03-09 07:43:55 +01:00
go func() {
for sams := range s.ringBufferBulkInCh {
2021-08-25 10:16:55 +02:00
for _, sam := range sams {
ringBufferInCh <- sam
2021-03-09 07:43:55 +01:00
}
}
close(ringBufferInCh)
2021-03-09 07:43:55 +01:00
}()
// Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and if no subject
// exist throw an error.
2022-01-27 07:19:04 +01:00
var event Event
eventAvailable := event.CheckEventAvailable()
2021-03-09 07:43:55 +01:00
var method Method
methodsAvailable := method.GetMethodsAvailable()
go func() {
2021-11-09 13:18:58 +01:00
for samDBVal := range ringBufferOutCh {
// Signal back to the ringbuffer that message have been picked up.
2021-11-09 13:18:58 +01:00
samDBVal.delivered()
2021-09-13 07:02:14 +02:00
2021-11-09 13:18:58 +01:00
sam := samDBVal.samDBValue.Data
2021-03-09 07:43:55 +01:00
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er)
2021-03-09 07:43:55 +01:00
continue
}
2022-01-27 07:19:04 +01:00
if !eventAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er)
2021-03-09 07:43:55 +01:00
continue
}
2021-09-21 23:29:42 +02:00
for {
2021-11-10 11:21:38 +01:00
// Looping here so we are able to redo the sending
// of the last message if a process for the specified subject
2021-09-21 23:29:42 +02:00
// is not present. The process will then be created, and
// the code will loop back here.
m := sam.Message
2021-11-10 11:21:38 +01:00
// Check if it is a relay message
2021-11-10 12:58:23 +01:00
if m.RelayViaNode != "" && m.RelayViaNode != Node(s.nodeName) {
2021-11-10 11:21:38 +01:00
// Keep the original values.
m.RelayFromNode = m.FromNode
m.RelayToNode = m.ToNode
m.RelayOriginalViaNode = m.RelayViaNode
2021-11-10 11:21:38 +01:00
m.RelayOriginalMethod = m.Method
// Convert it to a relay initial message.
m.Method = REQRelayInitial
// Set the toNode of the message to this host, so we send
// it to ourselves again and pick it up with the subscriber
// for the REQReplyInitial handler method.
m.ToNode = Node(s.nodeName)
// We are now done with the initial checking for if the new
// message is a relay message, so we empty the viaNode field
// so we don't end in an endless loop here.
// The value is stored in RelayOriginalViaNode for later use.
m.RelayViaNode = ""
2021-11-10 11:21:38 +01:00
sam.Subject = newSubject(REQRelayInitial, string(s.nodeName))
2021-11-10 11:21:38 +01:00
}
2021-09-21 23:29:42 +02:00
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
2021-11-16 10:21:44 +01:00
s.processes.active.mu.Lock()
2021-11-16 19:07:24 +01:00
proc, ok := s.processes.active.procNames[pn]
2021-11-16 10:21:44 +01:00
s.processes.active.mu.Unlock()
2021-09-21 23:29:42 +02:00
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
2021-11-16 10:21:44 +01:00
if ok {
2021-09-21 23:29:42 +02:00
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
break
} else {
2021-11-10 06:22:03 +01:00
// If a publisher process do not exist for the given subject, create it.
2022-01-13 08:34:22 +01:00
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
2021-09-21 23:29:42 +02:00
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
2021-11-09 14:01:42 +01:00
2021-09-21 23:29:42 +02:00
proc.spawnWorker(s.processes, s.natsConn)
2022-01-13 08:34:22 +01:00
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
2021-09-21 23:29:42 +02:00
2021-11-10 06:22:03 +01:00
// Now when the process is spawned we continue,
2021-09-21 23:29:42 +02:00
// and send the message to that new process.
continue
}
2021-03-09 07:43:55 +01:00
}
}
}()
}
2021-08-23 12:47:33 +02:00
func (s *server) exposeDataFolder(ctx context.Context) {
2021-08-23 17:05:56 +02:00
fileHandler := func(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Content-Type", "text/html")
http.FileServer(http.Dir(s.configuration.SubscribersDataFolder)).ServeHTTP(w, r)
}
2021-08-23 12:47:33 +02:00
//create a file server, and serve the files found in ./
2021-08-23 17:05:56 +02:00
//fd := http.FileServer(http.Dir(s.configuration.SubscribersDataFolder))
http.HandleFunc("/", fileHandler)
2021-08-23 12:47:33 +02:00
// we create a net.Listen type to use later with the http.Serve function.
nl, err := net.Listen("tcp", s.configuration.ExposeDataFolder)
if err != nil {
log.Println("error: starting net.Listen: ", err)
}
// start the web server with http.Serve instead of the usual http.ListenAndServe
err = http.Serve(nl, nil)
if err != nil {
log.Printf("Error: failed to start web server: %v\n", err)
}
os.Exit(1)
}