2021-01-28 10:17:54 +00:00
// Notes:
2021-02-01 10:13:38 +00:00
package steward
2021-01-25 14:23:00 +00:00
import (
2021-07-02 09:26:52 +00:00
"context"
2021-01-25 14:23:00 +00:00
"fmt"
"log"
2021-03-30 08:37:16 +00:00
"net"
2021-08-23 10:47:33 +00:00
"net/http"
2021-03-02 12:46:02 +00:00
"os"
2021-05-12 07:50:03 +00:00
"path/filepath"
2021-01-25 14:23:00 +00:00
"time"
"github.com/nats-io/nats.go"
2021-09-10 08:21:33 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-01-25 14:23:00 +00:00
)
2021-02-26 06:55:28 +00:00
type processName string
2021-03-09 03:55:51 +00:00
// Will return a process name made up of subjectName+processKind
2021-02-26 06:55:28 +00:00
func processNameGet ( sn subjectName , pk processKind ) processName {
pn := fmt . Sprintf ( "%s_%s" , sn , pk )
return processName ( pn )
}
2021-01-28 10:17:54 +00:00
// server is the structure that will hold the state about spawned
// processes on a local instance.
2021-01-27 13:02:57 +00:00
type server struct {
2021-07-02 09:26:52 +00:00
// The main background context
ctx context . Context
// The CancelFunc for the main context
2021-08-11 10:23:37 +00:00
cancel context . CancelFunc
2021-03-01 19:49:43 +00:00
// Configuration options used for running the server
configuration * Configuration
// The nats connection to the broker
2021-01-27 13:02:57 +00:00
natsConn * nats . Conn
2021-07-02 06:38:44 +00:00
// net listener for communicating via the steward socket
2021-08-09 12:41:31 +00:00
StewardSocket net . Listener
2021-07-02 06:38:44 +00:00
// net listener for the communication with Stew
2021-08-09 12:41:31 +00:00
StewSocket net . Listener
2021-03-03 14:44:32 +00:00
// processes holds all the information about running processes
processes * processes
2021-02-04 12:26:10 +00:00
// The name of the node
nodeName string
2021-08-25 06:31:48 +00:00
// newMessagesCh are the channel where new messages to be handled
2021-11-09 12:18:58 +00:00
// by the system are put. So if any process want's to send a message
// like an error message you just put the message on the newMessagesCh.
//
// In general the ringbuffer will read this
// channel for messages to put on the buffer.
//
// Example:
// A message is read from the socket, then put on the newMessagesCh
// and then put on the ringbuffer.
2021-08-25 06:31:48 +00:00
newMessagesCh chan [ ] subjectAndMessage
2021-02-24 09:58:02 +00:00
// errorKernel is doing all the error handling like what to do if
// an error occurs.
2021-02-05 12:56:42 +00:00
errorKernel * errorKernel
2021-09-15 06:39:34 +00:00
// Ring buffer
ringBuffer * ringBuffer
2021-02-18 11:29:14 +00:00
// metric exporter
metrics * metrics
2021-09-10 08:04:25 +00:00
// Version of package
version string
2021-01-27 13:02:57 +00:00
}
2021-01-28 10:17:54 +00:00
// newServer will prepare and return a server type
2021-09-10 08:04:25 +00:00
func NewServer ( c * Configuration , version string ) ( * server , error ) {
2021-08-11 10:23:37 +00:00
// Set up the main background context.
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2021-04-19 19:06:37 +00:00
var opt nats . Option
2021-08-16 11:01:12 +00:00
2021-04-19 19:06:37 +00:00
if c . RootCAPath != "" {
opt = nats . RootCAs ( c . RootCAPath )
}
2021-05-20 10:27:25 +00:00
if c . NkeySeedFile != "" {
var err error
2021-08-11 10:23:37 +00:00
2021-05-20 10:27:25 +00:00
opt , err = nats . NkeyOptionFromSeed ( c . NkeySeedFile )
if err != nil {
2021-07-02 09:26:52 +00:00
cancel ( )
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to read nkey seed file: %v" , err )
}
}
2021-05-25 08:23:27 +00:00
var conn * nats . Conn
2021-08-16 11:01:12 +00:00
// Connect to the nats server, and retry until succesful.
2021-05-25 08:23:27 +00:00
for {
var err error
// Setting MaxReconnects to -1 which equals unlimited.
2021-12-16 10:01:01 +00:00
conn , err = nats . Connect ( c . BrokerAddress , opt , nats . MaxReconnects ( - 1 ) , nats . ReconnectJitter ( time . Duration ( c . NatsReconnectJitter ) * time . Millisecond , time . Duration ( c . NatsReconnectJitterTLS ) * time . Second ) )
2021-05-25 08:23:27 +00:00
// If no servers where available, we loop and retry until succesful.
if err != nil {
2021-09-01 11:39:54 +00:00
log . Printf ( "error: could not connect, waiting %v seconds, and retrying: %v\n" , c . NatsConnectRetryInterval , err )
time . Sleep ( time . Duration ( time . Second * time . Duration ( c . NatsConnectRetryInterval ) ) )
2021-09-01 11:25:02 +00:00
continue
2021-05-25 08:23:27 +00:00
}
2021-02-01 10:13:38 +00:00
2021-05-25 08:23:27 +00:00
break
}
2021-08-11 10:23:37 +00:00
2021-12-16 10:01:01 +00:00
log . Printf ( " * conn.Opts.ReconnectJitterTLS: %v\n" , conn . Opts . ReconnectJitterTLS )
log . Printf ( " * conn.Opts.ReconnectJitter: %v\n" , conn . Opts . ReconnectJitter )
2021-07-02 06:38:44 +00:00
// Prepare the connection to the Steward socket file
2021-05-12 07:50:03 +00:00
// Check if socket folder exists, if not create it
if _ , err := os . Stat ( c . SocketFolder ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( c . SocketFolder , 0700 )
if err != nil {
2021-07-02 09:26:52 +00:00
cancel ( )
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to create socket folder directory %v: %v" , c . SocketFolder , err )
2021-05-12 07:50:03 +00:00
}
}
2021-08-11 10:23:37 +00:00
// Just as an extra check we eventually delete any existing Steward socket files if found.
2021-05-12 07:50:03 +00:00
socketFilepath := filepath . Join ( c . SocketFolder , "steward.sock" )
if _ , err := os . Stat ( socketFilepath ) ; ! os . IsNotExist ( err ) {
err = os . Remove ( socketFilepath )
if err != nil {
er := fmt . Errorf ( "error: could not delete sock file: %v" , err )
2021-07-02 09:26:52 +00:00
cancel ( )
2021-05-12 07:50:03 +00:00
return nil , er
}
2021-03-30 08:37:16 +00:00
}
2021-08-11 10:23:37 +00:00
// Open the socket.
2021-05-12 07:50:03 +00:00
nl , err := net . Listen ( "unix" , socketFilepath )
2021-03-30 08:37:16 +00:00
if err != nil {
2021-04-16 11:18:10 +00:00
er := fmt . Errorf ( "error: failed to open socket: %v" , err )
2021-07-02 09:26:52 +00:00
cancel ( )
2021-04-16 11:18:10 +00:00
return nil , er
2021-03-30 08:37:16 +00:00
}
2021-07-02 06:38:44 +00:00
// ---
// Prepare the connection to the Stew socket file
// Check if socket folder exists, if not create it
if _ , err := os . Stat ( c . SocketFolder ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( c . SocketFolder , 0700 )
if err != nil {
2021-07-02 09:26:52 +00:00
cancel ( )
2021-07-02 06:38:44 +00:00
return nil , fmt . Errorf ( "error: failed to create socket folder directory %v: %v" , c . SocketFolder , err )
}
}
stewSocketFilepath := filepath . Join ( c . SocketFolder , "stew.sock" )
2021-08-11 10:23:37 +00:00
// Just as an extra check we eventually delete any existing Stew socket files if found.
2021-07-02 06:38:44 +00:00
if _ , err := os . Stat ( stewSocketFilepath ) ; ! os . IsNotExist ( err ) {
err = os . Remove ( stewSocketFilepath )
if err != nil {
er := fmt . Errorf ( "error: could not delete stew.sock file: %v" , err )
2021-07-02 09:26:52 +00:00
cancel ( )
2021-07-02 06:38:44 +00:00
return nil , er
}
}
stewNL , err := net . Listen ( "unix" , stewSocketFilepath )
if err != nil {
er := fmt . Errorf ( "error: failed to open stew socket: %v" , err )
2021-07-02 09:26:52 +00:00
cancel ( )
2021-07-02 06:38:44 +00:00
return nil , er
}
// ---
2021-03-31 06:56:13 +00:00
metrics := newMetrics ( c . PromHostAndPort )
2021-02-01 12:41:04 +00:00
s := & server {
2021-08-25 06:31:48 +00:00
ctx : ctx ,
cancel : cancel ,
configuration : c ,
nodeName : c . NodeName ,
natsConn : conn ,
StewardSocket : nl ,
StewSocket : stewNL ,
processes : newProcesses ( ctx , metrics ) ,
newMessagesCh : make ( chan [ ] subjectAndMessage ) ,
metrics : metrics ,
2021-09-10 08:04:25 +00:00
version : version ,
2021-02-01 12:41:04 +00:00
}
2021-01-29 05:09:48 +00:00
2021-03-02 12:46:02 +00:00
// Create the default data folder for where subscribers should
2021-03-25 11:50:58 +00:00
// write it's data, check if data folder exist, and create it if needed.
2021-03-02 12:46:02 +00:00
if _ , err := os . Stat ( c . SubscribersDataFolder ) ; os . IsNotExist ( err ) {
if c . SubscribersDataFolder == "" {
return nil , fmt . Errorf ( "error: subscribersDataFolder value is empty, you need to provide the config or the flag value at startup %v: %v" , c . SubscribersDataFolder , err )
}
err := os . Mkdir ( c . SubscribersDataFolder , 0700 )
if err != nil {
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to create data folder directory %v: %v" , c . SubscribersDataFolder , err )
2021-03-02 12:46:02 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , c . SubscribersDataFolder )
}
2021-02-05 06:25:12 +00:00
return s , nil
}
2021-02-24 09:58:02 +00:00
// Start will spawn up all the predefined subscriber processes.
2021-02-10 06:25:44 +00:00
// Spawning of publisher processes is done on the fly by checking
2021-02-24 09:58:02 +00:00
// if there is publisher process for a given message subject, and
2021-08-16 11:01:12 +00:00
// if it does not exist it will spawn one.
2021-02-10 04:11:48 +00:00
func ( s * server ) Start ( ) {
2021-09-23 03:46:25 +00:00
log . Printf ( "Starting steward, version=%+v\n" , s . version )
2021-09-10 08:21:33 +00:00
s . metrics . promVersion . With ( prometheus . Labels { "version" : string ( s . version ) } )
2021-09-10 08:04:25 +00:00
2021-02-19 10:07:09 +00:00
// Start the error kernel that will do all the error handling
2021-08-03 11:57:29 +00:00
// that is not done within a process.
2021-08-04 08:37:24 +00:00
s . errorKernel = newErrorKernel ( s . ctx )
2021-08-03 11:43:05 +00:00
2021-08-04 08:37:24 +00:00
go func ( ) {
2021-08-25 06:31:48 +00:00
err := s . errorKernel . start ( s . newMessagesCh )
2021-08-04 08:37:24 +00:00
if err != nil {
log . Printf ( "%v\n" , err )
}
} ( )
2021-02-19 10:07:09 +00:00
2021-02-18 11:29:14 +00:00
// Start collecting the metrics
2021-08-03 11:57:29 +00:00
go func ( ) {
2021-08-04 08:37:24 +00:00
err := s . metrics . start ( )
2021-08-03 11:57:29 +00:00
if err != nil {
log . Printf ( "%v\n" , err )
os . Exit ( 1 )
}
} ( )
2021-02-18 11:29:14 +00:00
2021-03-29 11:36:30 +00:00
// Start the checking the input socket for new messages from operator.
2021-08-25 08:16:55 +00:00
go s . readSocket ( )
2021-02-26 08:02:53 +00:00
2021-09-10 03:26:16 +00:00
// Check if we should start the tcp listener for new messages from operator.
2021-08-23 14:00:48 +00:00
if s . configuration . TCPListener != "" {
2021-09-09 11:32:04 +00:00
go s . readTCPListener ( )
2021-08-23 14:00:48 +00:00
}
2021-09-10 03:26:16 +00:00
// Check if we should start the http listener for new messages from operator.
if s . configuration . HTTPListener != "" {
go s . readHttpListener ( )
}
2021-08-11 08:11:57 +00:00
// Start up the predefined subscribers.
//
// Since all the logic to handle processes are tied to the process
// struct, we need to create an initial process to start the rest.
2021-08-11 10:23:37 +00:00
//
// NB: The context of the initial process are set in processes.Start.
2021-08-09 07:18:30 +00:00
sub := newSubject ( REQInitial , s . nodeName )
2021-09-08 15:57:21 +00:00
p := newProcess ( context . TODO ( ) , s . metrics , s . natsConn , s . processes , s . newMessagesCh , s . configuration , sub , s . errorKernel . errorCh , "" , nil )
2021-08-11 08:11:57 +00:00
// Start all wanted subscriber processes.
s . processes . Start ( p )
2021-08-03 11:43:05 +00:00
2021-08-09 07:18:30 +00:00
time . Sleep ( time . Second * 1 )
s . processes . printProcessesMap ( )
2021-02-18 13:27:53 +00:00
2021-08-23 10:47:33 +00:00
// Start exposing the the data folder via HTTP if flag is set.
if s . configuration . ExposeDataFolder != "" {
log . Printf ( "info: Starting expose of data folder via HTTP\n" )
go s . exposeDataFolder ( s . ctx )
}
2021-08-09 07:18:30 +00:00
// Start the processing of new messages from an input channel.
2021-09-15 06:39:34 +00:00
// NB: We might need to create a sub context for the ringbuffer here
// so we can cancel this context last, and not use the server.
2021-08-25 06:50:24 +00:00
s . routeMessagesToProcess ( "./incomingBuffer.db" )
2021-02-10 06:25:44 +00:00
2021-08-09 07:18:30 +00:00
}
2021-02-05 09:47:07 +00:00
2021-08-09 07:18:30 +00:00
// Will stop all processes started during startup.
func ( s * server ) Stop ( ) {
// Stop the started pub/sub message processes.
2021-08-11 10:23:37 +00:00
s . processes . Stop ( )
2021-08-09 07:18:30 +00:00
log . Printf ( "info: stopped all subscribers\n" )
2021-07-02 06:38:44 +00:00
2021-08-09 07:18:30 +00:00
// Stop the errorKernel.
s . errorKernel . stop ( )
log . Printf ( "info: stopped the errorKernel\n" )
2021-07-02 11:26:30 +00:00
2021-08-09 07:18:30 +00:00
// Stop the main context.
2021-08-11 10:23:37 +00:00
s . cancel ( )
2021-08-09 07:18:30 +00:00
log . Printf ( "info: stopped the main context\n" )
2021-08-09 12:41:31 +00:00
// Delete the socket file when the program exits.
socketFilepath := filepath . Join ( s . configuration . SocketFolder , "steward.sock" )
if _ , err := os . Stat ( socketFilepath ) ; ! os . IsNotExist ( err ) {
err = os . Remove ( socketFilepath )
if err != nil {
er := fmt . Errorf ( "error: could not delete sock file: %v" , err )
log . Printf ( "%v\n" , er )
}
}
2021-02-05 09:47:07 +00:00
}
2021-02-25 10:08:05 +00:00
// sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
2021-09-07 07:43:54 +00:00
func sendErrorLogMessage ( conf * Configuration , metrics * metrics , newMessagesCh chan <- [ ] subjectAndMessage , FromNode Node , theError error ) {
2021-03-26 04:13:51 +00:00
// NB: Adding log statement here for more visuality during development.
log . Printf ( "%v\n" , theError )
2021-09-07 07:43:54 +00:00
sam := createErrorMsgContent ( conf , FromNode , theError )
2021-02-25 10:08:05 +00:00
newMessagesCh <- [ ] subjectAndMessage { sam }
2021-08-26 10:26:08 +00:00
metrics . promErrorMessagesSentTotal . Inc ( )
2021-02-25 10:08:05 +00:00
}
2021-02-24 14:43:31 +00:00
2021-09-23 06:31:30 +00:00
// sendInfoMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
func sendInfoLogMessage ( conf * Configuration , metrics * metrics , newMessagesCh chan <- [ ] subjectAndMessage , FromNode Node , theError error ) {
// NB: Adding log statement here for more visuality during development.
log . Printf ( "%v\n" , theError )
sam := createErrorMsgContent ( conf , FromNode , theError )
newMessagesCh <- [ ] subjectAndMessage { sam }
metrics . promInfoMessagesSentTotal . Inc ( )
}
2021-02-25 10:08:05 +00:00
// createErrorMsgContent will prepare a subject and message with the content
// of the error
2021-09-07 07:43:54 +00:00
func createErrorMsgContent ( conf * Configuration , FromNode Node , theError error ) subjectAndMessage {
2021-04-06 07:06:26 +00:00
// Add time stamp
2021-09-23 06:19:53 +00:00
er := fmt . Sprintf ( "%v, node: %v, %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , FromNode , theError . Error ( ) )
2021-04-06 07:06:26 +00:00
2021-02-24 14:43:31 +00:00
sam := subjectAndMessage {
2021-04-06 05:56:49 +00:00
Subject : newSubject ( REQErrorLog , "errorCentral" ) ,
2021-02-24 14:43:31 +00:00
Message : Message {
2021-09-07 07:43:54 +00:00
Directory : "errorLog" ,
ToNode : "errorCentral" ,
FromNode : FromNode ,
FileName : "error.log" ,
Data : [ ] string { er } ,
Method : REQErrorLog ,
ACKTimeout : conf . ErrorMessageTimeout ,
Retries : conf . ErrorMessageRetries ,
2021-02-24 14:43:31 +00:00
} ,
}
2021-02-25 10:08:05 +00:00
return sam
2021-02-24 14:43:31 +00:00
}
2021-03-09 06:43:55 +00:00
2021-07-05 05:43:33 +00:00
// Contains the sam value as it is used in the state DB, and also a
// delivered function to be called when this message is picked up, so
// we can control if messages gets stale at some point.
2021-07-02 17:09:42 +00:00
type samDBValueAndDelivered struct {
2021-07-02 16:32:01 +00:00
samDBValue samDBValue
2021-07-05 05:43:33 +00:00
delivered func ( )
2021-07-02 16:32:01 +00:00
}
2021-08-25 06:50:24 +00:00
// routeMessagesToProcess takes a database name it's input argument.
// The database will be used as the persistent k/v store for the work
// queue which is implemented as a ring buffer.
// The newMessagesCh are where we get new messages to publish.
2021-03-09 06:43:55 +00:00
// Incomming messages will be routed to the correct subject process, where
// the handling of each nats subject is handled within it's own separate
// worker process.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
2021-08-25 06:50:24 +00:00
func ( s * server ) routeMessagesToProcess ( dbFileName string ) {
2021-03-09 06:43:55 +00:00
// Prepare and start a new ring buffer
const bufferSize int = 1000
2021-09-14 14:23:01 +00:00
const samValueBucket string = "samValueBucket"
const indexValueBucket string = "indexValueBucket"
2021-09-15 07:17:35 +00:00
s . ringBuffer = newringBuffer ( s . ctx , s . metrics , s . configuration , bufferSize , dbFileName , Node ( s . nodeName ) , s . newMessagesCh , samValueBucket , indexValueBucket )
2021-08-26 05:02:36 +00:00
2021-08-25 06:31:48 +00:00
ringBufferInCh := make ( chan subjectAndMessage )
2021-07-02 17:09:42 +00:00
ringBufferOutCh := make ( chan samDBValueAndDelivered )
2021-03-09 06:43:55 +00:00
// start the ringbuffer.
2021-11-18 05:50:25 +00:00
s . ringBuffer . start ( s . ctx , ringBufferInCh , ringBufferOutCh )
2021-03-09 06:43:55 +00:00
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.
go func ( ) {
2021-08-25 08:16:55 +00:00
for sams := range s . newMessagesCh {
for _ , sam := range sams {
2021-08-25 06:31:48 +00:00
ringBufferInCh <- sam
2021-03-09 06:43:55 +00:00
}
}
2021-08-25 06:31:48 +00:00
close ( ringBufferInCh )
2021-03-09 06:43:55 +00:00
} ( )
// Process the messages that are in the ring buffer. Check and
// send if there are a specific subject for it, and if no subject
// exist throw an error.
var coe CommandOrEvent
coeAvailable := coe . GetCommandOrEventAvailable ( )
var method Method
methodsAvailable := method . GetMethodsAvailable ( )
go func ( ) {
2021-11-09 12:18:58 +00:00
for samDBVal := range ringBufferOutCh {
2021-09-13 11:15:21 +00:00
// Signal back to the ringbuffer that message have been picked up.
2021-11-09 12:18:58 +00:00
samDBVal . delivered ( )
2021-09-13 05:02:14 +00:00
2021-11-09 12:18:58 +00:00
sam := samDBVal . samDBValue . Data
2021-03-09 06:43:55 +00:00
// Check if the format of the message is correct.
if _ , ok := methodsAvailable . CheckIfExists ( sam . Message . Method ) ; ! ok {
2021-03-12 11:08:11 +00:00
er := fmt . Errorf ( "error: routeMessagesToProcess: the method do not exist, message dropped: %v" , sam . Message . Method )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( s . configuration , s . metrics , s . newMessagesCh , Node ( s . nodeName ) , er )
2021-03-09 06:43:55 +00:00
continue
}
if ! coeAvailable . CheckIfExists ( sam . Subject . CommandOrEvent , sam . Subject ) {
2021-03-12 11:08:11 +00:00
er := fmt . Errorf ( "error: routeMessagesToProcess: the command or event do not exist, message dropped: %v" , sam . Message . Method )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( s . configuration , s . metrics , s . newMessagesCh , Node ( s . nodeName ) , er )
2021-03-12 11:08:11 +00:00
2021-03-09 06:43:55 +00:00
continue
}
2021-09-21 21:29:42 +00:00
for {
2021-11-10 10:21:38 +00:00
// Looping here so we are able to redo the sending
// of the last message if a process for the specified subject
2021-09-21 21:29:42 +00:00
// is not present. The process will then be created, and
// the code will loop back here.
m := sam . Message
2021-11-10 10:21:38 +00:00
// Check if it is a relay message
2021-11-10 11:58:23 +00:00
if m . RelayViaNode != "" && m . RelayViaNode != Node ( s . nodeName ) {
2021-11-19 08:35:53 +00:00
2021-11-10 10:21:38 +00:00
// Keep the original values.
m . RelayFromNode = m . FromNode
m . RelayToNode = m . ToNode
2021-11-19 08:35:53 +00:00
m . RelayOriginalViaNode = m . RelayViaNode
2021-11-10 10:21:38 +00:00
m . RelayOriginalMethod = m . Method
2021-11-19 08:35:53 +00:00
// Convert it to a relay initial message.
m . Method = REQRelayInitial
// Set the toNode of the message to this host, so we send
// it to ourselves again and pick it up with the subscriber
// for the REQReplyInitial handler method.
m . ToNode = Node ( s . nodeName )
// We are now done with the initial checking for if the new
// message is a relay message, so we empty the viaNode field
// so we don't end in an endless loop here.
// The value is stored in RelayOriginalViaNode for later use.
m . RelayViaNode = ""
2021-11-10 10:21:38 +00:00
2021-11-19 08:35:53 +00:00
sam . Subject = newSubject ( REQRelayInitial , string ( s . nodeName ) )
2021-11-10 10:21:38 +00:00
}
2021-09-21 21:29:42 +00:00
subjName := sam . Subject . name ( )
pn := processNameGet ( subjName , processKindPublisher )
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
2021-11-16 09:21:44 +00:00
s . processes . active . mu . Lock ( )
2021-11-16 18:07:24 +00:00
proc , ok := s . processes . active . procNames [ pn ]
2021-11-16 09:21:44 +00:00
s . processes . active . mu . Unlock ( )
2021-09-13 11:15:21 +00:00
2021-09-21 21:29:42 +00:00
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
2021-11-16 09:21:44 +00:00
if ok {
2021-09-21 21:29:42 +00:00
// We have found the process to route the message to, deliver it.
proc . subject . messageCh <- m
break
} else {
2021-11-10 05:22:03 +00:00
// If a publisher process do not exist for the given subject, create it.
2021-09-21 21:29:42 +00:00
log . Printf ( "info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n" , subjName )
sub := newSubject ( sam . Subject . Method , sam . Subject . ToNode )
proc := newProcess ( s . ctx , s . metrics , s . natsConn , s . processes , s . newMessagesCh , s . configuration , sub , s . errorKernel . errorCh , processKindPublisher , nil )
2021-11-09 13:01:42 +00:00
2021-09-21 21:29:42 +00:00
proc . spawnWorker ( s . processes , s . natsConn )
log . Printf ( "info: processNewMessages: new process started, subject: %v, processID: %v\n" , subjName , proc . processID )
2021-11-10 05:22:03 +00:00
// Now when the process is spawned we continue,
2021-09-21 21:29:42 +00:00
// and send the message to that new process.
continue
}
2021-03-09 06:43:55 +00:00
}
}
} ( )
}
2021-08-23 10:47:33 +00:00
func ( s * server ) exposeDataFolder ( ctx context . Context ) {
2021-08-23 15:05:56 +00:00
fileHandler := func ( w http . ResponseWriter , r * http . Request ) {
// w.Header().Set("Content-Type", "text/html")
http . FileServer ( http . Dir ( s . configuration . SubscribersDataFolder ) ) . ServeHTTP ( w , r )
}
2021-08-23 10:47:33 +00:00
//create a file server, and serve the files found in ./
2021-08-23 15:05:56 +00:00
//fd := http.FileServer(http.Dir(s.configuration.SubscribersDataFolder))
http . HandleFunc ( "/" , fileHandler )
2021-08-23 10:47:33 +00:00
// we create a net.Listen type to use later with the http.Serve function.
nl , err := net . Listen ( "tcp" , s . configuration . ExposeDataFolder )
if err != nil {
log . Println ( "error: starting net.Listen: " , err )
}
// start the web server with http.Serve instead of the usual http.ListenAndServe
err = http . Serve ( nl , nil )
if err != nil {
log . Printf ( "Error: failed to start web server: %v\n" , err )
}
os . Exit ( 1 )
}