1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

updated comments

This commit is contained in:
postmannen 2021-08-16 13:01:12 +02:00
parent e35a65f395
commit c27a3dc188
13 changed files with 142 additions and 105 deletions

View file

@ -8,12 +8,16 @@ import (
"os/signal" "os/signal"
"time" "time"
_ "net/http/pprof" // _ "net/http/pprof"
"github.com/RaaLabs/steward" "github.com/RaaLabs/steward"
) )
func main() { func main() {
// defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
// defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
// defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop()
c := steward.NewConfiguration() c := steward.NewConfiguration()
err := c.CheckFlags() err := c.CheckFlags()
if err != nil { if err != nil {

View file

@ -48,14 +48,17 @@ const (
// Same as above, but No ACK. // Same as above, but No ACK.
EventNACK CommandOrEvent = "EventNACK" EventNACK CommandOrEvent = "EventNACK"
// eventCommand, just wait for the ACK that the // eventCommand, just wait for the ACK that the
// message is received. What action happens on the // message is received. What action happens is up to the
// receiving side is up to the received to decide. // received to decide.
) )
// CommandOrEventAvailable are used for checking if the
// commands or events are defined.
type CommandOrEventAvailable struct { type CommandOrEventAvailable struct {
topics map[CommandOrEvent]struct{} topics map[CommandOrEvent]struct{}
} }
// Check if a command or even exists.
func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent, subject Subject) bool { func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent, subject Subject) bool {
_, ok := co.topics[c] _, ok := co.topics[c]
if ok { if ok {

View file

@ -228,7 +228,6 @@ func (c *Configuration) CheckFlags() error {
return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help") return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help")
} }
// NB: Disabling the config file options for now.
if err := c.WriteConfigFile(); err != nil { if err := c.WriteConfigFile(); err != nil {
log.Printf("error: checkFlags: failed writing config file: %v\n", err) log.Printf("error: checkFlags: failed writing config file: %v\n", err)
os.Exit(1) os.Exit(1)

View file

@ -1,9 +1,9 @@
// The error kernel shall handle errors for a given process. // The error kernel shall handle errors for a given process.
// This will be cases where the process itself where unable // This will be cases where the process itself were unable
// to handle the error on it's own, and we might need to // to handle the error on it's own, and we might need to
// restart the process, or send a message back to the operator // restart the process, or send a message back to the operator
// that the action which the message where supposed to trigger, // that the action which the message where supposed to trigger
// or that an event where unable to be processed. // failed, or that an event where unable to be processed.
package steward package steward

View file

@ -61,6 +61,7 @@ type Message struct {
// --- // ---
// operation are used to specify opCmd and opArg's.
type Operation struct { type Operation struct {
OpCmd string `json:"opCmd"` OpCmd string `json:"opCmd"`
OpArg json.RawMessage `json:"opArg"` OpArg json.RawMessage `json:"opArg"`
@ -69,7 +70,7 @@ type Operation struct {
// --- // ---
// gobEncodePayload will encode the message structure into gob // gobEncodePayload will encode the message structure into gob
// binary format. // binary format before putting it into a nats message.
func gobEncodeMessage(m Message) ([]byte, error) { func gobEncodeMessage(m Message) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf) gobEnc := gob.NewEncoder(&buf)
@ -83,12 +84,13 @@ func gobEncodeMessage(m Message) ([]byte, error) {
// --- Subject // --- Subject
// Node is the type definition for the node who receive or send a message.
type Node string type Node string
// subject contains the representation of a subject to be used with one // subject contains the representation of a subject to be used with one
// specific process // specific process
type Subject struct { type Subject struct {
// node, the name of the node // node, the name of the node to receive the message.
ToNode string `json:"node" yaml:"toNode"` ToNode string `json:"node" yaml:"toNode"`
// messageType, command/event // messageType, command/event
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
@ -124,6 +126,7 @@ func newSubject(method Method, node string) Subject {
// subjectName is the complete representation of a subject // subjectName is the complete representation of a subject
type subjectName string type subjectName string
// Return a value of the subjectName for the subject as used with nats subject.
func (s Subject) name() subjectName { func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent)) return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))
} }

View file

@ -22,7 +22,8 @@ const (
processKindPublisher processKind = "publisher" processKindPublisher processKind = "publisher"
) )
// process are represent the communication to one individual host // process holds all the logic to handle a message type and it's
// method, subscription/publishin messages for a subject, and more.
type process struct { type process struct {
messageID int messageID int
// the subject used for the specific process. One process // the subject used for the specific process. One process
@ -189,7 +190,6 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
p.processName = pn p.processName = pn
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
idProcMap := make(map[int]process) idProcMap := make(map[int]process)
idProcMap[p.processID] = p idProcMap[p.processID] = p
@ -199,8 +199,9 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
} }
// messageDeliverNats will take care of the delivering the message // messageDeliverNats will take care of the delivering the message
// as converted to gob format as a nats.Message. It will also take // that is converted to gob format as a nats.Message. It will also
// care of checking timeouts and retries specified for the message. // take care of checking timeouts and retries specified for the
// message.
func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
retryAttempts := 0 retryAttempts := 0
@ -263,6 +264,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// did not receive a reply, decide what to do.. // did not receive a reply, decide what to do..
retryAttempts++ retryAttempts++
log.Printf("Retry attempts:%v, retries: %v, ACKTimeout: %v\n", retryAttempts, message.Retries, message.ACKTimeout) log.Printf("Retry attempts:%v, retries: %v, ACKTimeout: %v\n", retryAttempts, message.Retries, message.ACKTimeout)
switch { switch {
case message.Retries == 0: case message.Retries == 0:
// 0 indicates unlimited retries // 0 indicates unlimited retries
@ -308,7 +310,9 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
} }
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
switch { switch {
// Check for ACK type Commands or Event.
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
@ -342,6 +346,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, out) natsConn.Publish(msg.Reply, out)
// Check for NACK type Commands or Event.
case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK:
mf, ok := p.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
@ -372,7 +377,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
} }
// ---
default: default:
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
@ -380,16 +385,15 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
} }
} }
// Subscribe will start up a Go routine under the hood calling the // SubscribeMessage will register the Nats callback function for the specified
// callback function specified when a new message is received. // nats subject. This allows us to receive Nats messages for a given subject
// on a node.
func (p process) subscribeMessages() *nats.Subscription { func (p process) subscribeMessages() *nats.Subscription {
subject := string(p.subject.name()) subject := string(p.subject.name())
natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here. // Start up the subscriber handler.
// This is for being able to reply back the current publisher who sent
// the message.
go p.subscriberHandler(p.natsConn, p.configuration.NodeName, msg) go p.subscriberHandler(p.natsConn, p.configuration.NodeName, msg)
}) })
if err != nil { if err != nil {

View file

@ -38,12 +38,14 @@ func newProcesses(ctx context.Context, promRegistry *prometheus.Registry) *proce
active: make(map[processName]map[int]process), active: make(map[processName]map[int]process),
} }
// Prepare the main context for the subscribers. // Prepare the parent context for the subscribers.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
p.ctx = ctx p.ctx = ctx
p.cancel = cancel p.cancel = cancel
// Register the metrics for the process.
p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{
Name: "total_running_processes", Name: "total_running_processes",
Help: "The current number of total running processes", Help: "The current number of total running processes",

View file

@ -28,6 +28,7 @@ func newMetrics(hostAndPort string) *metrics {
return &m return &m
} }
// Start the http interface for Prometheus metrics.
func (m *metrics) start() error { func (m *metrics) start() error {
//http.Handle("/metrics", promhttp.Handler()) //http.Handle("/metrics", promhttp.Handler())

View file

@ -57,6 +57,11 @@ func (s *server) writeStewSocket(toStewSocketCh []byte) {
//s.StewSockListener //s.StewSockListener
} }
// The subject are made up of different parts of the message field.
// To make things easier and to avoid figuring out what the subject
// is in all places we've created the concept of subjectAndMessage
// (sam) where we get the subject for the message once, and use the
// sam structure with subject alongside the message instead.
type subjectAndMessage struct { type subjectAndMessage struct {
Subject `json:"subject" yaml:"subject"` Subject `json:"subject" yaml:"subject"`
Message `json:"message" yaml:"message"` Message `json:"message" yaml:"message"`
@ -91,7 +96,7 @@ func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) {
} }
// newSAM will look up the correct values and value types to // newSAM will look up the correct values and value types to
// be used in a subject for a Message, and return the a combined structure // be used in a subject for a Message (sam), and return the a combined structure
// of type subjectAndMessage. // of type subjectAndMessage.
func newSAM(m Message) (subjectAndMessage, error) { func newSAM(m Message) (subjectAndMessage, error) {
// We need to create a tempory method type to look up the kind for the // We need to create a tempory method type to look up the kind for the

View file

@ -1,12 +1,11 @@
// Info: The idea about the ring buffer is that we have a FIFO // Info: The idea about the ring buffer is that we have a FIFO
// buffer where we store all incomming messages requested by // buffer where we store all incomming messages requested by
// operators. Each message processed will also be stored in a DB. // operators.
// // Each message in process or waiting to be processed will be
// Idea: All incomming messages should be handled from the in-memory // stored in a DB. When the processing of a given message is
// buffered channel, but when they are put on the buffer they should // done it will be removed from the state db, and an entry will
// also be written to the DB with a handled flag set to false. // made in the persistent message log.
// When a message have left the buffer the handled flag should be
// set to true.
package steward package steward
import ( import (
@ -33,16 +32,23 @@ type samDBValue struct {
// ringBuffer holds the data of the buffer, // ringBuffer holds the data of the buffer,
type ringBuffer struct { type ringBuffer struct {
bufData chan samDBValue // In memory buffer for the messages.
db *bolt.DB bufData chan samDBValue
// The database to use.
db *bolt.DB
// The current number of items in the database.
totalMessagesIndex int totalMessagesIndex int
mu sync.Mutex mu sync.Mutex
permStore chan string // The channel to send messages that have been processed,
nodeName Node // and we want to store it in the permanent message log.
newMessagesCh chan []subjectAndMessage permStore chan string
// Name of node.
nodeName Node
// New messages to the system to be put into the ring buffer.
newMessagesCh chan []subjectAndMessage
} }
// newringBuffer is a push/pop storage for values. // newringBuffer returns a push/pop storage for values.
func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
// --- // ---
// Check if socket folder exists, if not create it // Check if socket folder exists, if not create it
@ -128,7 +134,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
} }
// Check for incomming messages. These are typically comming from // Check for incomming messages. These are typically comming from
// the go routine who reads msg.pipe. // the go routine who reads the socket.
for v := range inCh { for v := range inCh {
// Check if the command or event exists in commandOrEvent.go // Check if the command or event exists in commandOrEvent.go
@ -166,7 +172,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
js, err := json.Marshal(samV) js, err := json.Marshal(samV)
if err != nil { if err != nil {
er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err) er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
} }
@ -207,7 +213,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
// We start the actual processing of an individual message here within // We start the actual processing of an individual message here within
// it's own go routine. Reason is that we don't want to block other // it's own go routine. Reason is that we don't want to block other
// messages being blocked while waiting for the done signal, or if an // messages to be processed while waiting for the done signal, or if an
// error with an individual message occurs. // error with an individual message occurs.
go func(v samDBValue) { go func(v samDBValue) {
v.Data.Message.done = make(chan struct{}) v.Data.Message.done = make(chan struct{})
@ -230,6 +236,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
case <-delivredCh: case <-delivredCh:
// OK. // OK.
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
// TODO: Check out if more logic should be made here if messages are stuck etc.
// Testing with a timeout here to figure out if messages are stuck // Testing with a timeout here to figure out if messages are stuck
// waiting for done signal. // waiting for done signal.
log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID)
@ -306,7 +313,7 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
return samDBValues, err return samDBValues, err
} }
// printBuckerContent will print out all they keys and values in the // printBucketContent will print out all they keys and values in the
// specified bucket. // specified bucket.
func (r *ringBuffer) printBucketContent(bucket string) error { func (r *ringBuffer) printBucketContent(bucket string) error {
err := r.db.View(func(tx *bolt.Tx) error { err := r.db.View(func(tx *bolt.Tx) error {
@ -363,10 +370,10 @@ func (r *ringBuffer) getIndexValue(indexBucket string) int {
return index return index
} }
// dbView will look up a specific value for a key in a bucket in a DB. // dbView will look up and return a specific value if it exists for a key in a bucket in a DB.
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) { func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
var value []byte var value []byte
//View is a help function to get values out of the database. // View is a help function to get values out of the database.
err := db.View(func(tx *bolt.Tx) error { err := db.View(func(tx *bolt.Tx) error {
//Open a bucket to get key's and values from. //Open a bucket to get key's and values from.
bu := tx.Bucket([]byte(bucket)) bu := tx.Bucket([]byte(bucket))
@ -429,9 +436,6 @@ func (r *ringBuffer) startPermanentStore() {
if err != nil { if err != nil {
log.Printf("error:failed to write entry: %v\n", err) log.Printf("error:failed to write entry: %v\n", err)
} }
// REMOVED: time
// time.Sleep(time.Second * 1)
} }
} }

View file

@ -56,6 +56,7 @@ func NewServer(c *Configuration) (*server, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var opt nats.Option var opt nats.Option
if c.RootCAPath != "" { if c.RootCAPath != "" {
opt = nats.RootCAs(c.RootCAPath) opt = nats.RootCAs(c.RootCAPath)
} }
@ -70,11 +71,10 @@ func NewServer(c *Configuration) (*server, error) {
} }
} }
// Connect to the nats server, and retry until succesful.
var conn *nats.Conn var conn *nats.Conn
const connRetryWait = 5 const connRetryWait = 5
// Connect to the nats server, and retry until succesful.
for { for {
var err error var err error
// Setting MaxReconnects to -1 which equals unlimited. // Setting MaxReconnects to -1 which equals unlimited.
@ -196,7 +196,7 @@ func NewServer(c *Configuration) (*server, error) {
// Start will spawn up all the predefined subscriber processes. // Start will spawn up all the predefined subscriber processes.
// Spawning of publisher processes is done on the fly by checking // Spawning of publisher processes is done on the fly by checking
// if there is publisher process for a given message subject, and // if there is publisher process for a given message subject, and
// not exist it will spawn one. // if it does not exist it will spawn one.
func (s *server) Start() { func (s *server) Start() {
// Start the error kernel that will do all the error handling // Start the error kernel that will do all the error handling
// that is not done within a process. // that is not done within a process.
@ -242,10 +242,6 @@ func (s *server) Start() {
// Will stop all processes started during startup. // Will stop all processes started during startup.
func (s *server) Stop() { func (s *server) Stop() {
// TODO: Add done sync functionality within the
// stop functions so we get a confirmation that
// all processes actually are stopped.
// Stop the started pub/sub message processes. // Stop the started pub/sub message processes.
s.processes.Stop() s.processes.Stop()
log.Printf("info: stopped all subscribers\n") log.Printf("info: stopped all subscribers\n")

View file

@ -40,6 +40,8 @@ func TestStewardServer(t *testing.T) {
// Start Steward instance // Start Steward instance
// --------------------------------------- // ---------------------------------------
// tempdir := t.TempDir() // tempdir := t.TempDir()
// Create the config to run a steward instance.
tempdir := "./tmp" tempdir := "./tmp"
conf := &Configuration{ conf := &Configuration{
SocketFolder: filepath.Join(tempdir, "tmp"), SocketFolder: filepath.Join(tempdir, "tmp"),
@ -200,7 +202,7 @@ func checkREQnCliCommandTest(conf *Configuration, t *testing.T) error {
return nil return nil
} }
// The non-sequential sending of CLI Commands. // The continous non-sequential sending of CLI Commands.
func checkREQnCliCommandContTest(conf *Configuration, t *testing.T) error { func checkREQnCliCommandContTest(conf *Configuration, t *testing.T) error {
m := `[ m := `[
{ {
@ -252,6 +254,7 @@ func checkREQHelloTest(conf *Configuration, t *testing.T) error {
return nil return nil
} }
// Check the error logger type.
func checkREQErrorLogTest(conf *Configuration, t *testing.T) error { func checkREQErrorLogTest(conf *Configuration, t *testing.T) error {
m := `[ m := `[
{ {
@ -275,6 +278,7 @@ func checkREQErrorLogTest(conf *Configuration, t *testing.T) error {
return nil return nil
} }
// Check http get method.
func checkREQHttpGetTest(conf *Configuration, t *testing.T) error { func checkREQHttpGetTest(conf *Configuration, t *testing.T) error {
// Web server for testing. // Web server for testing.
{ {
@ -313,8 +317,7 @@ func checkREQHttpGetTest(conf *Configuration, t *testing.T) error {
return nil return nil
} }
// --- // Check the tailing of files type.
func checkREQTailFileTest(conf *Configuration, t *testing.T) error { func checkREQTailFileTest(conf *Configuration, t *testing.T) error {
// Create a file with some content. // Create a file with some content.
fh, err := os.OpenFile("test.file", os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) fh, err := os.OpenFile("test.file", os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)

View file

@ -57,7 +57,7 @@ type Method string
// The constants that will be used throughout the system for // The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with. // when specifying what kind of Method to send or work with.
const ( const (
// Initial method used to start other processes. // Initial parent method used to start other processes.
REQInitial Method = "REQInitial" REQInitial Method = "REQInitial"
// Command for client operation request of the system. The op // Command for client operation request of the system. The op
// command to execute shall be given in the data field of the // command to execute shall be given in the data field of the
@ -136,7 +136,7 @@ const (
func (m Method) GetMethodsAvailable() MethodsAvailable { func (m Method) GetMethodsAvailable() MethodsAvailable {
// Command, Used to make a request to perform an action // Command, Used to make a request to perform an action
// Event, Used to communicate that an action has been performed. // Event, Used to communicate that something have happened.
ma := MethodsAvailable{ ma := MethodsAvailable{
Methodhandlers: map[Method]methodHandler{ Methodhandlers: map[Method]methodHandler{
REQInitial: methodREQInitial{ REQInitial: methodREQInitial{
@ -214,6 +214,7 @@ func (m Method) getHandler(method Method) methodHandler {
// ---- // ----
// Initial parent method used to start other processes.
type methodREQInitial struct { type methodREQInitial struct {
commandOrEvent CommandOrEvent commandOrEvent CommandOrEvent
} }
@ -229,6 +230,9 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([
} }
// ---- // ----
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
type MethodsAvailable struct { type MethodsAvailable struct {
Methodhandlers map[Method]methodHandler Methodhandlers map[Method]methodHandler
} }
@ -247,10 +251,51 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
} }
} }
// Create a new message for the reply containing the output of the
// action executed put in outData, and put it on the ringbuffer to
// be published.
// The method to use for the reply message should initially be
// specified within the first message as the replyMethod, and we will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
func newReplyMessage(proc process, message Message, outData []byte) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message.ReplyMethod == "" {
message.ReplyMethod = REQToFileAppend
}
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(outData)},
Method: message.ReplyMethod,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage: &message,
}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// Subscriber method handlers // Subscriber method handlers
// ------------------------------------------------------------ // ------------------------------------------------------------
// The methodHandler interface.
type methodHandler interface { type methodHandler interface {
handler(proc process, message Message, node string) ([]byte, error) handler(proc process, message Message, node string) ([]byte, error)
getKind() CommandOrEvent getKind() CommandOrEvent
@ -310,7 +355,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.mu.Unlock() proc.processes.mu.Unlock()
case "startProc": case "startProc":
// Set the interface type dst to &OpStart. // Set the empty interface type dst to &OpStart.
dst = &OpCmdStartProc{} dst = &OpCmdStartProc{}
err := json.Unmarshal(message.Operation.OpArg, &dst) err := json.Unmarshal(message.Operation.OpArg, &dst)
@ -361,8 +406,8 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
// Assert it into the correct non pointer value. // Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStopProc) arg := *dst.(*OpCmdStopProc)
// Based on the arg values received in the message we create can // Based on the arg values received in the message we create a
// create a processName structure as used in naming the real processes. // processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the // We can then use this processName to get the real values for the
// actual process we want to stop. // actual process we want to stop.
sub := newSubject(arg.Method, string(arg.RecevingNode)) sub := newSubject(arg.Method, string(arg.RecevingNode))
@ -387,6 +432,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.mu.Lock() proc.processes.mu.Lock()
// Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][arg.ID] toStopProc, ok := proc.processes.active[processName][arg.ID]
if ok { if ok {
// Delete the process from the processes map // Delete the process from the processes map
@ -429,47 +475,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
return ackMsg, nil return ackMsg, nil
} }
//-- //----
// Create a new message for the reply containing the output of the
// action executed put in outData, and put it on the ringbuffer to
// be published.
// The method to use for the reply message should initially be
// specified within the first message as the replyMethod, and we will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
func newReplyMessage(proc process, message Message, outData []byte) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message.ReplyMethod == "" {
message.ReplyMethod = REQToFileAppend
}
//--
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(outData)},
Method: message.ReplyMethod,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage: &message,
}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
//--
}
type methodREQToFileAppend struct { type methodREQToFileAppend struct {
commandOrEvent CommandOrEvent commandOrEvent CommandOrEvent
@ -479,6 +485,7 @@ func (m methodREQToFileAppend) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handle appending data to file.
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages // If it was a request type message we want to check what the initial messages
@ -546,6 +553,8 @@ func (m methodREQToFile) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages // If it was a request type message we want to check what the initial messages
@ -614,10 +623,10 @@ func (m methodREQHello) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handler for receiving hello messages.
func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) {
data := fmt.Sprintf("Received hello from %#v\n", message.FromNode) data := fmt.Sprintf("Received hello from %#v\n", message.FromNode)
// --------------------------
fileName := fmt.Sprintf("%v.%v%v", message.FromNode, message.Method, message.FileExtension) fileName := fmt.Sprintf("%v.%v%v", message.FromNode, message.Method, message.FileExtension)
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode)) folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode))
@ -667,11 +676,10 @@ func (m methodREQErrorLog) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handle the writing of error logs.
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data)
// --
// If it was a request type message we want to check what the initial messages // If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data. // method, so we can use that in creating the file name to store the data.
var fileName string var fileName string
@ -719,8 +727,6 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
// --
} }
// --- // ---
@ -733,7 +739,9 @@ func (m methodREQPing) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handle receving a ping.
func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) {
// TODO: Replace this with an append to file on receival.
log.Printf("<--- PING REQUEST received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- PING REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
@ -759,7 +767,9 @@ func (m methodREQPong) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handle receiving a pong.
func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) {
// TODO: Replace this with an append to file on receival.
log.Printf("<--- ECHO Reply received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- ECHO Reply received from: %v, containing: %v", message.FromNode, message.Data)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -916,6 +926,7 @@ func (m methodREQToConsole) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// Handler to write directly to console.
func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) {
fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data) fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data)
@ -933,7 +944,7 @@ func (m methodREQHttpGet) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// handler to run a Http Get. // handler to do a Http Get.
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
@ -1190,6 +1201,8 @@ func (m methodREQToSocket) getKind() CommandOrEvent {
return m.commandOrEvent return m.commandOrEvent
} }
// TODO:
// Handler to write to unix socket file.
func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) {
for _, d := range message.Data { for _, d := range message.Data {