2021-01-28 11:17:54 +01:00
// Notes:
2021-02-01 11:13:38 +01:00
package steward
2021-01-25 15:23:00 +01:00
import (
"bytes"
"encoding/gob"
"fmt"
"log"
2021-03-02 13:46:02 +01:00
"os"
2021-01-28 14:58:16 +01:00
"sync"
2021-01-25 15:23:00 +01:00
"time"
"github.com/nats-io/nats.go"
2021-02-19 16:58:16 +01:00
"github.com/prometheus/client_golang/prometheus"
2021-01-25 15:23:00 +01:00
)
2021-02-26 07:55:28 +01:00
type processName string
func processNameGet ( sn subjectName , pk processKind ) processName {
pn := fmt . Sprintf ( "%s_%s" , sn , pk )
return processName ( pn )
}
2021-01-28 11:17:54 +01:00
// server is the structure that will hold the state about spawned
// processes on a local instance.
2021-01-27 14:02:57 +01:00
type server struct {
2021-03-01 20:49:43 +01:00
// Configuration options used for running the server
configuration * Configuration
// The nats connection to the broker
2021-01-27 14:02:57 +01:00
natsConn * nats . Conn
2021-01-28 11:17:54 +01:00
// TODO: sessions should probably hold a slice/map of processes ?
2021-02-26 07:55:28 +01:00
processes map [ processName ] process
2021-01-28 11:17:54 +01:00
// The last processID created
lastProcessID int
2021-02-04 13:26:10 +01:00
// The name of the node
nodeName string
2021-02-24 10:58:02 +01:00
// Mutex for locking when writing to the process map
mu sync . Mutex
2021-02-24 15:43:31 +01:00
// The channel where we put new messages read from file,
// or some other process who wants to send something via the
// system
2021-02-24 10:58:02 +01:00
// We can than range this channel for new messages to process.
2021-02-24 15:43:31 +01:00
newMessagesCh chan [ ] subjectAndMessage
2021-02-24 10:58:02 +01:00
// errorKernel is doing all the error handling like what to do if
// an error occurs.
// TODO: Will also send error messages to cental error subscriber.
2021-02-05 13:56:42 +01:00
errorKernel * errorKernel
2021-02-10 14:29:17 +01:00
// used to check if the methods specified in message is valid
methodsAvailable MethodsAvailable
2021-02-17 18:59:49 +01:00
// Map who holds the command and event types available.
// Used to check if the commandOrEvent specified in message is valid
2021-02-10 14:29:17 +01:00
commandOrEventAvailable CommandOrEventAvailable
2021-02-18 12:29:14 +01:00
// metric exporter
metrics * metrics
2021-02-24 10:58:02 +01:00
// subscriberServices are where we find the services and the API to
// use services needed by subscriber.
// For example, this can be a service that knows
// how to forward the data for a received message of type log to a
// central logger.
subscriberServices * subscriberServices
2021-02-24 15:43:31 +01:00
// Is this the central error logger ?
2021-02-26 09:02:53 +01:00
// collection of the publisher services and the types to control them
publisherServices * publisherServices
2021-02-24 15:43:31 +01:00
centralErrorLogger bool
2021-02-25 13:08:10 +01:00
// default message timeout in seconds. This can be overridden on the message level
defaultMessageTimeout int
// default amount of retries that will be done before a message is thrown away, and out of the system
defaultMessageRetries int
2021-01-27 14:02:57 +01:00
}
2021-01-28 11:17:54 +01:00
// newServer will prepare and return a server type
2021-03-01 20:49:43 +01:00
func NewServer ( c * Configuration ) ( * server , error ) {
conn , err := nats . Connect ( c . BrokerAddress , nil )
2021-02-01 11:13:38 +01:00
if err != nil {
log . Printf ( "error: nats.Connect failed: %v\n" , err )
}
2021-02-10 14:29:17 +01:00
var m Method
2021-02-24 10:58:02 +01:00
var coe CommandOrEvent
2021-02-10 14:29:17 +01:00
2021-02-01 13:41:04 +01:00
s := & server {
2021-03-01 20:49:43 +01:00
configuration : c ,
nodeName : c . NodeName ,
2021-02-10 14:29:17 +01:00
natsConn : conn ,
2021-02-26 07:55:28 +01:00
processes : make ( map [ processName ] process ) ,
2021-02-24 15:43:31 +01:00
newMessagesCh : make ( chan [ ] subjectAndMessage ) ,
2021-02-10 14:29:17 +01:00
methodsAvailable : m . GetMethodsAvailable ( ) ,
2021-02-24 10:58:02 +01:00
commandOrEventAvailable : coe . GetCommandOrEventAvailable ( ) ,
2021-03-01 20:49:43 +01:00
metrics : newMetrics ( c . PromHostAndPort ) ,
2021-02-24 10:58:02 +01:00
subscriberServices : newSubscriberServices ( ) ,
2021-03-01 20:49:43 +01:00
publisherServices : newPublisherServices ( c . PublisherServiceSayhello ) ,
centralErrorLogger : c . CentralErrorLogger ,
defaultMessageTimeout : c . DefaultMessageTimeout ,
defaultMessageRetries : c . DefaultMessageRetries ,
2021-02-01 13:41:04 +01:00
}
2021-01-29 06:09:48 +01:00
2021-03-02 13:46:02 +01:00
// Create the default data folder for where subscribers should
// write it's data if needed.
// Check if data folder exist, and create it if needed.
if _ , err := os . Stat ( c . SubscribersDataFolder ) ; os . IsNotExist ( err ) {
if c . SubscribersDataFolder == "" {
return nil , fmt . Errorf ( "error: subscribersDataFolder value is empty, you need to provide the config or the flag value at startup %v: %v" , c . SubscribersDataFolder , err )
}
err := os . Mkdir ( c . SubscribersDataFolder , 0700 )
if err != nil {
return nil , fmt . Errorf ( "error: failed to create directory %v: %v" , c . SubscribersDataFolder , err )
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , c . SubscribersDataFolder )
}
2021-02-05 07:25:12 +01:00
return s , nil
}
2021-02-24 10:58:02 +01:00
// Start will spawn up all the predefined subscriber processes.
2021-02-10 07:25:44 +01:00
// Spawning of publisher processes is done on the fly by checking
2021-02-24 10:58:02 +01:00
// if there is publisher process for a given message subject, and
// not exist it will spawn one.
2021-02-10 05:11:48 +01:00
func ( s * server ) Start ( ) {
2021-02-19 11:07:09 +01:00
// Start the error kernel that will do all the error handling
// not done within a process.
s . errorKernel = newErrorKernel ( )
2021-02-24 15:43:31 +01:00
s . errorKernel . startErrorKernel ( s . newMessagesCh )
2021-02-19 11:07:09 +01:00
2021-02-18 12:29:14 +01:00
// Start collecting the metrics
go s . startMetrics ( )
2021-02-05 10:47:07 +01:00
// Start the checking the input file for new messages from operator.
2021-02-24 15:43:31 +01:00
go s . getMessagesFromFile ( "./" , "inmsg.txt" , s . newMessagesCh )
2021-02-02 13:06:37 +01:00
2021-02-26 15:11:20 +01:00
// if enabled, start the sayHello I'm here service at the given interval
2021-02-26 09:02:53 +01:00
if s . publisherServices . sayHelloPublisher . interval != 0 {
go s . publisherServices . sayHelloPublisher . start ( s . newMessagesCh , node ( s . nodeName ) )
}
2021-02-24 10:58:02 +01:00
// Start up the predefined subscribers.
// TODO: What to subscribe on should be handled via flags, or config
// files.
s . subscribersStart ( )
2021-02-18 14:27:53 +01:00
2021-02-10 05:11:48 +01:00
time . Sleep ( time . Second * 2 )
2021-02-10 07:25:44 +01:00
s . printProcessesMap ( )
2021-02-24 10:58:02 +01:00
// Start the processing of new messaging from an input channel.
2021-02-24 15:43:31 +01:00
s . processNewMessages ( "./incommmingBuffer.db" , s . newMessagesCh )
2021-02-05 10:47:07 +01:00
select { }
}
2021-02-10 07:25:44 +01:00
func ( s * server ) printProcessesMap ( ) {
fmt . Println ( "--------------------------------------------------------------------------------------------" )
fmt . Printf ( "*** Output of processes map :\n" )
for _ , v := range s . processes {
fmt . Printf ( "*** - : %v\n" , v )
}
2021-02-19 16:58:16 +01:00
s . metrics . metricsCh <- metricType {
metric : prometheus . NewGauge ( prometheus . GaugeOpts {
Name : "total_running_processes" ,
Help : "The current number of total running processes" ,
} ) ,
value : float64 ( len ( s . processes ) ) ,
}
2021-02-10 07:25:44 +01:00
fmt . Println ( "--------------------------------------------------------------------------------------------" )
}
2021-02-09 11:52:08 +01:00
// processKind are either kindSubscriber or kindPublisher, and are
// used to distinguish the kind of process to spawn and to know
// the process kind put in the process map.
2021-02-09 11:16:02 +01:00
type processKind string
const (
2021-02-09 11:52:08 +01:00
processKindSubscriber processKind = "subscriber"
processKindPublisher processKind = "publisher"
2021-02-09 11:16:02 +01:00
)
2021-01-28 14:58:16 +01:00
// process are represent the communication to one individual host
type process struct {
messageID int
2021-02-03 08:28:21 +01:00
// the subject used for the specific process. One process
// can contain only one sender on a message bus, hence
// also one subject
2021-02-03 22:08:28 +01:00
subject Subject
2021-01-28 14:58:16 +01:00
// Put a node here to be able know the node a process is at.
// NB: Might not be needed later on.
node node
// The processID for the current process
processID int
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
2021-02-09 11:52:08 +01:00
errorCh chan errProcess
processKind processKind
2021-02-26 15:11:20 +01:00
// Who are we allowed to receive from ?
allowedReceivers map [ node ] struct { }
2021-01-28 11:17:54 +01:00
}
2021-02-01 13:41:04 +01:00
// prepareNewProcess will set the the provided values and the default
// values for a process.
2021-02-26 15:11:20 +01:00
func ( s * server ) processPrepareNew ( subject Subject , errCh chan errProcess , processKind processKind , allowedReceivers [ ] node ) process {
2021-02-03 09:06:37 +01:00
// create the initial configuration for a sessions communicating with 1 host process.
2021-01-28 11:17:54 +01:00
s . lastProcessID ++
2021-02-26 15:11:20 +01:00
2021-03-02 06:51:08 +01:00
// make the slice of allowedReceivers into a map value for easy lookup.
2021-02-26 15:11:20 +01:00
m := make ( map [ node ] struct { } )
for _ , a := range allowedReceivers {
m [ a ] = struct { } { }
}
2021-01-28 11:17:54 +01:00
proc := process {
2021-02-26 15:11:20 +01:00
messageID : 0 ,
subject : subject ,
node : node ( subject . ToNode ) ,
processID : s . lastProcessID ,
errorCh : errCh ,
processKind : processKind ,
allowedReceivers : m ,
2021-01-27 14:02:57 +01:00
}
2021-01-28 11:17:54 +01:00
return proc
}
2021-02-24 10:58:02 +01:00
// spawnWorkerProcess will spawn take care of spawning both publisher
// and subscriber proesses.
//It will give the process the next available ID, and also add the
// process to the processes map.
func ( s * server ) spawnWorkerProcess ( proc process ) {
2021-02-03 22:08:28 +01:00
s . mu . Lock ( )
2021-02-03 14:53:25 +01:00
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
2021-02-26 07:55:28 +01:00
var pn processName
if proc . processKind == processKindPublisher {
pn = processNameGet ( proc . subject . name ( ) , processKindPublisher )
}
if proc . processKind == processKindSubscriber {
pn = processNameGet ( proc . subject . name ( ) , processKindSubscriber )
}
s . processes [ pn ] = proc
2021-02-03 22:08:28 +01:00
s . mu . Unlock ( )
2021-01-27 14:02:57 +01:00
2021-02-02 13:06:37 +01:00
// TODO: I think it makes most sense that the messages would come to
// here from some other message-pickup-process, and that process will
// give the message to the correct publisher process. A channel that
// is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process.
2021-02-10 07:25:44 +01:00
//
// Handle publisher workers
2021-02-09 11:52:08 +01:00
if proc . processKind == processKindPublisher {
2021-02-24 10:58:02 +01:00
s . publishMessages ( proc )
2021-01-27 09:45:52 +01:00
}
2021-02-09 13:48:02 +01:00
2021-02-10 07:25:44 +01:00
// handle subscriber workers
2021-02-09 13:48:02 +01:00
if proc . processKind == processKindSubscriber {
2021-02-24 10:58:02 +01:00
s . subscribeMessages ( proc )
2021-02-09 13:48:02 +01:00
}
2021-01-27 09:45:52 +01:00
}
2021-02-24 15:43:31 +01:00
func ( s * server ) messageDeliverNats ( proc process , message Message ) {
2021-02-25 13:08:10 +01:00
retryAttempts := 0
2021-01-27 09:45:52 +01:00
for {
2021-02-15 11:28:27 +01:00
dataPayload , err := gobEncodeMessage ( message )
2021-01-27 09:45:52 +01:00
if err != nil {
log . Printf ( "error: createDataPayload: %v\n" , err )
}
msg := & nats . Msg {
2021-02-03 12:55:02 +01:00
Subject : string ( proc . subject . name ( ) ) ,
2021-03-01 12:16:36 +01:00
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"),
2021-01-29 06:09:48 +01:00
// Structure of the reply message are:
2021-01-29 14:22:36 +01:00
// reply.<nodename>.<message type>.<method>
2021-02-03 12:55:02 +01:00
Reply : fmt . Sprintf ( "reply.%s" , proc . subject . name ( ) ) ,
2021-01-29 06:09:48 +01:00
Data : dataPayload ,
2021-01-25 15:23:00 +01:00
}
2021-01-27 09:45:52 +01:00
// The SubscribeSync used in the subscriber, will get messages that
// are sent after it started subscribing, so we start a publisher
// that sends out a message every second.
//
// Create a subscriber for the reply message.
2021-02-24 15:43:31 +01:00
subReply , err := s . natsConn . SubscribeSync ( msg . Reply )
2021-01-27 09:45:52 +01:00
if err != nil {
2021-02-25 11:08:05 +01:00
log . Printf ( "error: nc.SubscribeSync failed: failed to create reply message: %v\n" , err )
2021-01-27 09:45:52 +01:00
continue
}
// Publish message
2021-02-24 15:43:31 +01:00
err = s . natsConn . PublishMsg ( msg )
2021-01-27 09:45:52 +01:00
if err != nil {
log . Printf ( "error: publish failed: %v\n" , err )
continue
}
2021-02-18 14:27:53 +01:00
// If the message is an ACK type of message we must check that a
// reply, and if it is not we don't wait here at all.
2021-03-02 13:46:02 +01:00
fmt . Printf ( "info: messageDeliverNats: preparing to send message: %v\n" , message )
2021-03-01 17:08:40 +01:00
if proc . subject . CommandOrEvent == CommandACK || proc . subject . CommandOrEvent == EventACK {
2021-02-18 14:27:53 +01:00
// Wait up until 10 seconds for a reply,
// continue and resend if to reply received.
2021-02-25 13:08:10 +01:00
msgReply , err := subReply . NextMsg ( time . Second * time . Duration ( message . Timeout ) )
2021-02-18 14:27:53 +01:00
if err != nil {
log . Printf ( "error: subReply.NextMsg failed for node=%v, subject=%v: %v\n" , proc . node , proc . subject . name ( ) , err )
2021-02-25 13:08:10 +01:00
// did not receive a reply, decide what to do..
retryAttempts ++
fmt . Printf ( "Retry attempts:%v, retries: %v, timeout: %v\n" , retryAttempts , message . Retries , message . Timeout )
switch {
case message . Retries == 0 :
// 0 indicates unlimited retries
continue
case retryAttempts >= message . Retries :
// max retries reached
log . Printf ( "info: max retries for message reached, breaking out: %v" , retryAttempts )
return
default :
// none of the above matched, so we've not reached max retries yet
continue
}
2021-02-18 14:27:53 +01:00
}
2021-02-26 15:11:20 +01:00
log . Printf ( "<--- publisher: received ACK for message: %s\n" , msgReply . Data )
2021-01-27 09:45:52 +01:00
}
return
}
}
2021-02-10 07:25:44 +01:00
// handler will deserialize the message when a new message is received,
// check the MessageType field in the message to decide what kind of
// message it is and then it will check how to handle that message type,
// and handle it.
// This handler function should be started in it's own go routine,so
// one individual handler is started per message received so we can keep
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
2021-02-26 15:11:20 +01:00
func ( s * server ) subscriberHandler ( natsConn * nats . Conn , thisNode string , msg * nats . Msg , proc process ) {
2021-02-10 07:25:44 +01:00
message := Message { }
// Create a buffer to decode the gob encoded binary data back
// to it's original structure.
buf := bytes . NewBuffer ( msg . Data )
gobDec := gob . NewDecoder ( buf )
err := gobDec . Decode ( & message )
if err != nil {
log . Printf ( "error: gob decoding failed: %v\n" , err )
}
//fmt.Printf("%v\n", msg)
// TODO: Maybe the handling of the errors within the subscriber
// should also involve the error-kernel to report back centrally
// that there was a problem like missing method to handle a specific
// method etc.
switch {
2021-03-01 17:08:40 +01:00
case proc . subject . CommandOrEvent == CommandACK || proc . subject . CommandOrEvent == EventACK :
2021-03-02 13:46:02 +01:00
log . Printf ( "info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n" , proc . subject . name ( ) )
2021-02-11 15:51:07 +01:00
mf , ok := s . methodsAvailable . CheckIfExists ( message . Method )
if ! ok {
// TODO: Check how errors should be handled here!!!
2021-03-01 17:08:40 +01:00
log . Printf ( "error: subscriberHandler: method type not available: %v\n" , proc . subject . CommandOrEvent )
2021-02-10 07:25:44 +01:00
}
2021-02-11 15:07:03 +01:00
2021-02-26 15:11:20 +01:00
out := [ ] byte ( "not allowed from " + message . FromNode )
var err error
2021-03-02 06:51:08 +01:00
// Check if we are allowed to receive from that host
2021-03-02 13:46:02 +01:00
_ , arOK1 := proc . allowedReceivers [ message . FromNode ]
_ , arOK2 := proc . allowedReceivers [ "*" ]
2021-02-26 15:11:20 +01:00
2021-03-02 13:46:02 +01:00
if arOK1 || arOK2 {
out , err = mf . handler ( s , proc , message , thisNode )
2021-02-26 15:11:20 +01:00
if err != nil {
// TODO: Send to error kernel ?
log . Printf ( "error: subscriberHandler: failed to execute event: %v\n" , err )
}
2021-03-02 13:46:02 +01:00
} else {
log . Printf ( "info: we don't allow receiving from: %v, %v\n" , message . FromNode , proc . subject )
2021-02-10 07:25:44 +01:00
}
// Send a confirmation message back to the publisher
2021-02-11 15:07:03 +01:00
natsConn . Publish ( msg . Reply , out )
2021-02-24 15:43:31 +01:00
2021-02-25 11:08:05 +01:00
// TESTING: Simulate that we also want to send some error that occured
// to the errorCentral
{
err := fmt . Errorf ( "error: some testing error we want to send out" )
sendErrorLogMessage ( s . newMessagesCh , node ( thisNode ) , err )
}
2021-03-01 17:08:40 +01:00
case proc . subject . CommandOrEvent == CommandNACK || proc . subject . CommandOrEvent == EventNACK :
2021-03-02 13:46:02 +01:00
log . Printf ( "info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n" , proc . subject . name ( ) )
2021-02-18 14:27:53 +01:00
mf , ok := s . methodsAvailable . CheckIfExists ( message . Method )
if ! ok {
// TODO: Check how errors should be handled here!!!
2021-03-01 17:08:40 +01:00
log . Printf ( "error: subscriberHandler: method type not available: %v\n" , proc . subject . CommandOrEvent )
2021-02-18 14:27:53 +01:00
}
// since we don't send a reply for a NACK message, we don't care about the
// out return when calling mf.handler
2021-03-02 13:46:02 +01:00
_ , err := mf . handler ( s , proc , message , thisNode )
2021-02-18 14:27:53 +01:00
if err != nil {
// TODO: Send to error kernel ?
log . Printf ( "error: subscriberHandler: failed to execute event: %v\n" , err )
}
2021-02-10 07:25:44 +01:00
default :
2021-03-01 17:08:40 +01:00
log . Printf ( "info: did not find that specific type of command: %#v\n" , proc . subject . CommandOrEvent )
2021-02-10 07:25:44 +01:00
}
}
2021-02-24 15:43:31 +01:00
2021-02-25 11:08:05 +01:00
// sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
func sendErrorLogMessage ( newMessagesCh chan <- [ ] subjectAndMessage , FromNode node , theError error ) {
2021-02-24 15:43:31 +01:00
// --- Testing
2021-02-25 11:08:05 +01:00
sam := createErrorMsgContent ( FromNode , theError )
newMessagesCh <- [ ] subjectAndMessage { sam }
}
2021-02-24 15:43:31 +01:00
2021-02-25 11:08:05 +01:00
// createErrorMsgContent will prepare a subject and message with the content
// of the error
func createErrorMsgContent ( FromNode node , theError error ) subjectAndMessage {
2021-02-24 15:43:31 +01:00
// TESTING: Creating an error message to send to errorCentral
fmt . Printf ( " --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" )
sam := subjectAndMessage {
Subject : Subject {
ToNode : "errorCentral" ,
CommandOrEvent : EventNACK ,
Method : ErrorLog ,
} ,
Message : Message {
2021-03-01 17:08:40 +01:00
ToNode : "errorCentral" ,
FromNode : FromNode ,
Data : [ ] string { theError . Error ( ) } ,
Method : ErrorLog ,
2021-02-24 15:43:31 +01:00
} ,
}
2021-02-25 11:08:05 +01:00
return sam
2021-02-24 15:43:31 +01:00
}