2021-01-28 11:17:54 +01:00
// Notes:
2021-02-01 11:13:38 +01:00
package steward
2021-01-25 15:23:00 +01:00
import (
"fmt"
"log"
2021-03-30 10:37:16 +02:00
"net"
2021-03-02 13:46:02 +01:00
"os"
2021-05-12 09:50:03 +02:00
"path/filepath"
2021-05-25 10:23:27 +02:00
"strings"
2021-01-28 14:58:16 +01:00
"sync"
2021-01-25 15:23:00 +01:00
"time"
"github.com/nats-io/nats.go"
2021-02-19 16:58:16 +01:00
"github.com/prometheus/client_golang/prometheus"
2021-04-12 10:51:26 +02:00
"github.com/prometheus/client_golang/prometheus/promauto"
2021-01-25 15:23:00 +01:00
)
2021-02-26 07:55:28 +01:00
type processName string
2021-03-09 04:55:51 +01:00
// Will return a process name made up of subjectName+processKind
2021-02-26 07:55:28 +01:00
func processNameGet ( sn subjectName , pk processKind ) processName {
pn := fmt . Sprintf ( "%s_%s" , sn , pk )
return processName ( pn )
}
2021-03-03 15:44:32 +01:00
// processes holds all the information about running processes
type processes struct {
// The active spawned processes
2021-06-08 13:56:31 +02:00
active map [ processName ] map [ int ] process
2021-03-03 15:44:32 +01:00
// mutex to lock the map
2021-03-09 07:43:55 +01:00
mu sync . RWMutex
2021-03-03 15:44:32 +01:00
// The last processID created
lastProcessID int
2021-04-12 10:51:26 +02:00
//
promTotalProcesses prometheus . Gauge
2021-04-12 15:35:20 +02:00
//
promProcessesVec * prometheus . GaugeVec
2021-03-03 15:44:32 +01:00
}
// newProcesses will prepare and return a *processes
2021-04-12 10:51:26 +02:00
func newProcesses ( promRegistry * prometheus . Registry ) * processes {
2021-03-03 15:44:32 +01:00
p := processes {
2021-06-08 13:56:31 +02:00
active : make ( map [ processName ] map [ int ] process ) ,
2021-03-03 15:44:32 +01:00
}
2021-04-12 10:51:26 +02:00
p . promTotalProcesses = promauto . NewGauge ( prometheus . GaugeOpts {
Name : "total_running_processes" ,
Help : "The current number of total running processes" ,
} )
2021-04-12 15:35:20 +02:00
p . promProcessesVec = promauto . NewGaugeVec ( prometheus . GaugeOpts {
Name : "running_process" ,
Help : "Name of the running process" ,
} , [ ] string { "processName" } ,
)
2021-03-03 15:44:32 +01:00
return & p
}
2021-01-28 11:17:54 +01:00
// server is the structure that will hold the state about spawned
// processes on a local instance.
2021-01-27 14:02:57 +01:00
type server struct {
2021-03-01 20:49:43 +01:00
// Configuration options used for running the server
configuration * Configuration
// The nats connection to the broker
2021-01-27 14:02:57 +01:00
natsConn * nats . Conn
2021-03-30 10:37:16 +02:00
// net listener for communicating via the socket
netListener net . Listener
2021-03-03 15:44:32 +01:00
// processes holds all the information about running processes
processes * processes
2021-02-04 13:26:10 +01:00
// The name of the node
nodeName string
2021-02-24 10:58:02 +01:00
// Mutex for locking when writing to the process map
2021-03-29 13:36:30 +02:00
toRingbufferCh chan [ ] subjectAndMessage
2021-02-24 10:58:02 +01:00
// errorKernel is doing all the error handling like what to do if
// an error occurs.
2021-02-05 13:56:42 +01:00
errorKernel * errorKernel
2021-02-18 12:29:14 +01:00
// metric exporter
metrics * metrics
2021-01-27 14:02:57 +01:00
}
2021-01-28 11:17:54 +01:00
// newServer will prepare and return a server type
2021-03-01 20:49:43 +01:00
func NewServer ( c * Configuration ) ( * server , error ) {
2021-04-19 21:06:37 +02:00
var opt nats . Option
if c . RootCAPath != "" {
opt = nats . RootCAs ( c . RootCAPath )
}
2021-05-20 12:27:25 +02:00
if c . NkeySeedFile != "" {
var err error
// fh, err := os.Open(c.NkeySeedFile)
// if err != nil {
// return nil, fmt.Errorf("error: failed to open nkey seed file: %v\n", err)
// }
// b, err := io.ReadAll(fh)
// if err != nil {
// return nil, fmt.Errorf("error: failed to read nkey seed file: %v\n", err)
// }
opt , err = nats . NkeyOptionFromSeed ( c . NkeySeedFile )
if err != nil {
return nil , fmt . Errorf ( "error: failed to read nkey seed file: %v" , err )
}
}
2021-05-25 10:23:27 +02:00
// Connect to the nats server, and retry until succesful.
var conn * nats . Conn
const connRetryWait = 5
for {
var err error
// Setting MaxReconnects to -1 which equals unlimited.
conn , err = nats . Connect ( c . BrokerAddress , opt , nats . MaxReconnects ( - 1 ) )
// Nats use string types for errors, so we need to check the content of the error.
// If no servers where available, we loop and retry until succesful.
if err != nil {
if strings . Contains ( err . Error ( ) , "no servers available" ) {
log . Printf ( "error: could not connect, waiting 5 seconds, and retrying: %v\n" , err )
time . Sleep ( time . Duration ( time . Second * connRetryWait ) )
continue
}
er := fmt . Errorf ( "error: nats.Connect failed: %v" , err )
return nil , er
}
2021-02-01 11:13:38 +01:00
2021-05-25 10:23:27 +02:00
break
}
2021-03-30 10:37:16 +02:00
// Prepare the connection to the socket file
2021-05-12 09:50:03 +02:00
// Check if socket folder exists, if not create it
if _ , err := os . Stat ( c . SocketFolder ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( c . SocketFolder , 0700 )
if err != nil {
2021-05-20 12:27:25 +02:00
return nil , fmt . Errorf ( "error: failed to create socket folder directory %v: %v" , c . SocketFolder , err )
2021-05-12 09:50:03 +02:00
}
}
socketFilepath := filepath . Join ( c . SocketFolder , "steward.sock" )
if _ , err := os . Stat ( socketFilepath ) ; ! os . IsNotExist ( err ) {
err = os . Remove ( socketFilepath )
if err != nil {
er := fmt . Errorf ( "error: could not delete sock file: %v" , err )
return nil , er
}
2021-03-30 10:37:16 +02:00
}
2021-05-12 09:50:03 +02:00
nl , err := net . Listen ( "unix" , socketFilepath )
2021-03-30 10:37:16 +02:00
if err != nil {
2021-04-16 13:18:10 +02:00
er := fmt . Errorf ( "error: failed to open socket: %v" , err )
return nil , er
2021-03-30 10:37:16 +02:00
}
2021-03-31 08:56:13 +02:00
metrics := newMetrics ( c . PromHostAndPort )
2021-02-01 13:41:04 +01:00
s := & server {
2021-03-29 13:36:30 +02:00
configuration : c ,
nodeName : c . NodeName ,
natsConn : conn ,
2021-03-30 10:37:16 +02:00
netListener : nl ,
2021-04-12 10:51:26 +02:00
processes : newProcesses ( metrics . promRegistry ) ,
2021-03-29 13:36:30 +02:00
toRingbufferCh : make ( chan [ ] subjectAndMessage ) ,
2021-03-31 08:56:13 +02:00
metrics : metrics ,
2021-02-01 13:41:04 +01:00
}
2021-01-29 06:09:48 +01:00
2021-03-02 13:46:02 +01:00
// Create the default data folder for where subscribers should
2021-03-25 12:50:58 +01:00
// write it's data, check if data folder exist, and create it if needed.
2021-03-02 13:46:02 +01:00
if _ , err := os . Stat ( c . SubscribersDataFolder ) ; os . IsNotExist ( err ) {
if c . SubscribersDataFolder == "" {
return nil , fmt . Errorf ( "error: subscribersDataFolder value is empty, you need to provide the config or the flag value at startup %v: %v" , c . SubscribersDataFolder , err )
}
err := os . Mkdir ( c . SubscribersDataFolder , 0700 )
if err != nil {
2021-05-20 12:27:25 +02:00
return nil , fmt . Errorf ( "error: failed to create data folder directory %v: %v" , c . SubscribersDataFolder , err )
2021-03-02 13:46:02 +01:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , c . SubscribersDataFolder )
}
2021-02-05 07:25:12 +01:00
return s , nil
}
2021-02-24 10:58:02 +01:00
// Start will spawn up all the predefined subscriber processes.
2021-02-10 07:25:44 +01:00
// Spawning of publisher processes is done on the fly by checking
2021-02-24 10:58:02 +01:00
// if there is publisher process for a given message subject, and
// not exist it will spawn one.
2021-02-10 05:11:48 +01:00
func ( s * server ) Start ( ) {
2021-02-19 11:07:09 +01:00
// Start the error kernel that will do all the error handling
// not done within a process.
s . errorKernel = newErrorKernel ( )
2021-03-29 13:36:30 +02:00
s . errorKernel . startErrorKernel ( s . toRingbufferCh )
2021-02-19 11:07:09 +01:00
2021-02-18 12:29:14 +01:00
// Start collecting the metrics
go s . startMetrics ( )
2021-03-29 13:36:30 +02:00
// Start the checking the input socket for new messages from operator.
go s . readSocket ( s . toRingbufferCh )
2021-02-26 09:02:53 +01:00
2021-04-09 11:30:40 +02:00
// Start up the predefined subscribers. Since all the logic to handle
// processes are tied to the process struct, we need to create an
// initial process to start the rest.
2021-04-08 13:43:47 +02:00
sub := newSubject ( REQInitial , s . nodeName )
p := newProcess ( s . natsConn , s . processes , s . toRingbufferCh , s . configuration , sub , s . errorKernel . errorCh , "" , [ ] node { } , nil )
p . ProcessesStart ( )
2021-02-18 14:27:53 +01:00
2021-03-04 16:27:55 +01:00
time . Sleep ( time . Second * 1 )
2021-03-31 08:56:13 +02:00
s . processes . printProcessesMap ( )
2021-02-10 07:25:44 +01:00
2021-03-09 07:43:55 +01:00
// Start the processing of new messages from an input channel.
2021-05-12 09:50:03 +02:00
s . routeMessagesToProcess ( "./incomingBuffer.db" , s . toRingbufferCh )
2021-02-05 10:47:07 +01:00
select { }
}
2021-03-31 08:56:13 +02:00
func ( p * processes ) printProcessesMap ( ) {
2021-02-10 07:25:44 +01:00
fmt . Println ( "--------------------------------------------------------------------------------------------" )
2021-04-16 13:43:58 +02:00
log . Printf ( "*** Output of processes map :\n" )
2021-03-31 08:56:13 +02:00
p . mu . Lock ( )
2021-06-08 13:56:31 +02:00
for _ , vSub := range p . active {
for _ , vID := range vSub {
log . Printf ( "* proc - : %v, id: %v, name: %v, allowed from: %v\n" , vID . processKind , vID . processID , vID . subject . name ( ) , vID . allowedReceivers )
}
2021-02-10 07:25:44 +01:00
}
2021-03-31 08:56:13 +02:00
p . mu . Unlock ( )
2021-02-19 16:58:16 +01:00
2021-04-12 10:51:26 +02:00
p . promTotalProcesses . Set ( float64 ( len ( p . active ) ) )
2021-02-19 16:58:16 +01:00
2021-02-10 07:25:44 +01:00
fmt . Println ( "--------------------------------------------------------------------------------------------" )
}
2021-02-25 11:08:05 +01:00
// sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
func sendErrorLogMessage ( newMessagesCh chan <- [ ] subjectAndMessage , FromNode node , theError error ) {
2021-03-26 05:13:51 +01:00
// NB: Adding log statement here for more visuality during development.
log . Printf ( "%v\n" , theError )
2021-02-25 11:08:05 +01:00
sam := createErrorMsgContent ( FromNode , theError )
newMessagesCh <- [ ] subjectAndMessage { sam }
}
2021-02-24 15:43:31 +01:00
2021-02-25 11:08:05 +01:00
// createErrorMsgContent will prepare a subject and message with the content
// of the error
func createErrorMsgContent ( FromNode node , theError error ) subjectAndMessage {
2021-04-06 09:06:26 +02:00
// Add time stamp
er := fmt . Sprintf ( "%v, %v\n" , time . Now ( ) . UTC ( ) , theError . Error ( ) )
2021-02-24 15:43:31 +01:00
sam := subjectAndMessage {
2021-04-06 07:56:49 +02:00
Subject : newSubject ( REQErrorLog , "errorCentral" ) ,
2021-02-24 15:43:31 +01:00
Message : Message {
2021-04-09 11:30:40 +02:00
Directory : "errorLog" ,
ToNode : "errorCentral" ,
FromNode : FromNode ,
FileExtension : ".log" ,
Data : [ ] string { er } ,
Method : REQErrorLog ,
2021-02-24 15:43:31 +01:00
} ,
}
2021-02-25 11:08:05 +01:00
return sam
2021-02-24 15:43:31 +01:00
}
2021-03-09 07:43:55 +01:00
2021-03-10 07:11:14 +01:00
// routeMessagesToProcess takes a database name and an input channel as
2021-03-09 07:43:55 +01:00
// it's input arguments.
// The database will be used as the persistent store for the work queue
// which is implemented as a ring buffer.
// The input channel are where we read new messages to publish.
// Incomming messages will be routed to the correct subject process, where
// the handling of each nats subject is handled within it's own separate
// worker process.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
2021-03-10 07:11:14 +01:00
func ( s * server ) routeMessagesToProcess ( dbFileName string , newSAM chan [ ] subjectAndMessage ) {
2021-03-09 07:43:55 +01:00
// Prepare and start a new ring buffer
const bufferSize int = 1000
2021-05-12 09:50:03 +02:00
rb := newringBuffer ( * s . configuration , bufferSize , dbFileName , node ( s . nodeName ) , s . toRingbufferCh )
2021-03-09 07:43:55 +01:00
inCh := make ( chan subjectAndMessage )
ringBufferOutCh := make ( chan samDBValue )
// start the ringbuffer.
rb . start ( inCh , ringBufferOutCh , s . configuration . DefaultMessageTimeout , s . configuration . DefaultMessageRetries )
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.
go func ( ) {
for samSlice := range newSAM {
for _ , sam := range samSlice {
inCh <- sam
}
}
close ( inCh )
} ( )
// Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and if no subject
// exist throw an error.
var coe CommandOrEvent
coeAvailable := coe . GetCommandOrEventAvailable ( )
var method Method
methodsAvailable := method . GetMethodsAvailable ( )
go func ( ) {
for samTmp := range ringBufferOutCh {
sam := samTmp . Data
// Check if the format of the message is correct.
if _ , ok := methodsAvailable . CheckIfExists ( sam . Message . Method ) ; ! ok {
2021-03-12 12:08:11 +01:00
er := fmt . Errorf ( "error: routeMessagesToProcess: the method do not exist, message dropped: %v" , sam . Message . Method )
2021-03-29 13:36:30 +02:00
sendErrorLogMessage ( s . toRingbufferCh , node ( s . nodeName ) , er )
2021-03-09 07:43:55 +01:00
continue
}
if ! coeAvailable . CheckIfExists ( sam . Subject . CommandOrEvent , sam . Subject ) {
2021-03-12 12:08:11 +01:00
er := fmt . Errorf ( "error: routeMessagesToProcess: the command or event do not exist, message dropped: %v" , sam . Message . Method )
2021-03-29 13:36:30 +02:00
sendErrorLogMessage ( s . toRingbufferCh , node ( s . nodeName ) , er )
2021-03-12 12:08:11 +01:00
2021-03-09 07:43:55 +01:00
continue
}
redo :
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present. The process will then be created, and
// the code will loop back to the redo: label.
m := sam . Message
subjName := sam . Subject . name ( )
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet ( subjName , processKindPublisher )
2021-03-09 11:58:50 +01:00
2021-06-08 13:56:31 +02:00
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
2021-03-09 11:58:50 +01:00
s . processes . mu . Lock ( )
2021-06-08 13:56:31 +02:00
existingProcIDMap , ok := s . processes . active [ pn ]
2021-03-09 11:58:50 +01:00
s . processes . mu . Unlock ( )
2021-03-09 07:43:55 +01:00
2021-06-08 13:56:31 +02:00
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
2021-03-09 07:43:55 +01:00
if ok {
2021-06-08 13:56:31 +02:00
s . processes . mu . Lock ( )
for _ , existingProc := range existingProcIDMap {
log . Printf ( "info: processNewMessages: found the specific subject: %v\n" , subjName )
existingProc . subject . messageCh <- m
}
s . processes . mu . Unlock ( )
2021-03-09 07:43:55 +01:00
// If no process to handle the specific subject exist,
// the we create and spawn one.
} else {
// If a publisher process do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log . Printf ( "info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n" , subjName )
2021-04-03 07:33:03 +02:00
sub := newSubject ( sam . Subject . Method , sam . Subject . ToNode )
2021-04-07 16:45:51 +02:00
proc := newProcess ( s . natsConn , s . processes , s . toRingbufferCh , s . configuration , sub , s . errorKernel . errorCh , processKindPublisher , nil , nil )
2021-03-09 07:43:55 +01:00
// fmt.Printf("*** %#v\n", proc)
2021-04-07 16:45:51 +02:00
proc . spawnWorker ( s . processes , s . natsConn )
2021-03-09 07:43:55 +01:00
// Now when the process is spawned we jump back to the redo: label,
// and send the message to that new process.
goto redo
}
}
} ( )
}