1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

initial test package created

This commit is contained in:
postmannen 2021-08-09 14:41:31 +02:00
parent 1a3e310cd1
commit e28eb3770d
5 changed files with 102 additions and 43 deletions

View file

@ -14,7 +14,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
// Loop, and wait for new connections. // Loop, and wait for new connections.
for { for {
conn, err := s.StewardSockListener.Accept() conn, err := s.StewardSocket.Accept()
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err) er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)

View file

@ -107,7 +107,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
func() { func() {
s, err := r.dumpBucket(samValueBucket) s, err := r.dumpBucket(samValueBucket)
if err != nil { if err != nil {
er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previos entries in db to process: %v", err) er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return return
//sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er) //sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)

View file

@ -76,9 +76,9 @@ type server struct {
// The nats connection to the broker // The nats connection to the broker
natsConn *nats.Conn natsConn *nats.Conn
// net listener for communicating via the steward socket // net listener for communicating via the steward socket
StewardSockListener net.Listener StewardSocket net.Listener
// net listener for the communication with Stew // net listener for the communication with Stew
StewSockListener net.Listener StewSocket net.Listener
// processes holds all the information about running processes // processes holds all the information about running processes
processes *processes processes *processes
// The name of the node // The name of the node
@ -155,7 +155,6 @@ func NewServer(c *Configuration) (*server, error) {
} }
socketFilepath := filepath.Join(c.SocketFolder, "steward.sock") socketFilepath := filepath.Join(c.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) { if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath) err = os.Remove(socketFilepath)
if err != nil { if err != nil {
@ -166,6 +165,7 @@ func NewServer(c *Configuration) (*server, error) {
} }
nl, err := net.Listen("unix", socketFilepath) nl, err := net.Listen("unix", socketFilepath)
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to open socket: %v", err) er := fmt.Errorf("error: failed to open socket: %v", err)
cancel() cancel()
@ -208,16 +208,16 @@ func NewServer(c *Configuration) (*server, error) {
metrics := newMetrics(c.PromHostAndPort) metrics := newMetrics(c.PromHostAndPort)
s := &server{ s := &server{
ctx: ctx, ctx: ctx,
ctxCancelFunc: cancel, ctxCancelFunc: cancel,
configuration: c, configuration: c,
nodeName: c.NodeName, nodeName: c.NodeName,
natsConn: conn, natsConn: conn,
StewardSockListener: nl, StewardSocket: nl,
StewSockListener: stewNL, StewSocket: stewNL,
processes: newProcesses(metrics.promRegistry), processes: newProcesses(metrics.promRegistry),
toRingbufferCh: make(chan []subjectAndMessage), toRingbufferCh: make(chan []subjectAndMessage),
metrics: metrics, metrics: metrics,
} }
// Create the default data folder for where subscribers should // Create the default data folder for where subscribers should
@ -266,19 +266,6 @@ func (s *server) Start() {
// Start the checking the input socket for new messages from operator. // Start the checking the input socket for new messages from operator.
go s.readSocket(s.toRingbufferCh) go s.readSocket(s.toRingbufferCh)
// Delete the socket file when the program exits.
defer func() {
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
log.Printf("%v\n", er)
}
}
}()
// Start up the predefined subscribers. Since all the logic to handle // Start up the predefined subscribers. Since all the logic to handle
// processes are tied to the process struct, we need to create an // processes are tied to the process struct, we need to create an
// initial process to start the rest. // initial process to start the rest.
@ -297,6 +284,10 @@ func (s *server) Start() {
// Will stop all processes started during startup. // Will stop all processes started during startup.
func (s *server) Stop() { func (s *server) Stop() {
// TODO: Add done sync functionality within the
// stop functions so we get a confirmation that
// all processes actually are stopped.
// Stop the started pub/sub message processes. // Stop the started pub/sub message processes.
s.ctxSubscribersCancelFunc() s.ctxSubscribersCancelFunc()
log.Printf("info: stopped all subscribers\n") log.Printf("info: stopped all subscribers\n")
@ -308,6 +299,18 @@ func (s *server) Stop() {
// Stop the main context. // Stop the main context.
s.ctxCancelFunc() s.ctxCancelFunc()
log.Printf("info: stopped the main context\n") log.Printf("info: stopped the main context\n")
// Delete the socket file when the program exits.
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
err = os.Remove(socketFilepath)
if err != nil {
er := fmt.Errorf("error: could not delete sock file: %v", err)
log.Printf("%v\n", er)
}
}
} }
func (p *processes) printProcessesMap() { func (p *processes) printProcessesMap() {

View file

@ -1,7 +1,11 @@
package steward package steward
import ( import (
"io"
"net"
"os"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
"time" "time"
@ -24,18 +28,22 @@ func TestSteward(t *testing.T) {
natsserver.PrintAndDie(err.Error()) natsserver.PrintAndDie(err.Error())
} }
// Start Steward instance for testing // Start Steward instance
tempdir := t.TempDir() // ---------------------------------------
tempdir := "./tmp"
conf := &Configuration{ conf := &Configuration{
SocketFolder: filepath.Join(tempdir, "tmp"), SocketFolder: filepath.Join(tempdir, "tmp"),
DatabaseFolder: filepath.Join(tempdir, "var/lib"), DatabaseFolder: filepath.Join(tempdir, "var/lib"),
SubscribersDataFolder: filepath.Join(tempdir, "data"), SubscribersDataFolder: filepath.Join(tempdir, "data"),
BrokerAddress: "127.0.0.1:40222", BrokerAddress: "127.0.0.1:40222",
NodeName: "central", NodeName: "central",
CentralNodeName: "central", CentralNodeName: "central",
DefaultMessageRetries: 1, DefaultMessageRetries: 1,
DefaultMessageTimeout: 3, DefaultMessageTimeout: 3,
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}},
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
} }
s, err := NewServer(conf) s, err := NewServer(conf)
if err != nil { if err != nil {
@ -44,11 +52,59 @@ func TestSteward(t *testing.T) {
s.Start() s.Start()
// Messaging tests.
//
// Write to socket.
// ---------------------------------------
want := "apekatt"
socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock"))
if err != nil {
t.Fatalf("error: failed to open socket file for writing: %v\n", err)
}
defer socket.Close()
m := `[
{
"directory":"commands-executed",
"fileExtension":".result",
"toNode": "central",
"data": ["bash","-c","echo apekatt"],
"replyMethod":"REQToFileAppend",
"method":"REQnCliCommand",
"ACKTimeout":3,
"retries":3,
"methodTimeout": 10
}
]`
_, err = socket.Write([]byte(m))
if err != nil {
t.Fatalf("error: failed to write to socket: %v\n", err)
}
// Wait a couple of seconds for the request to go through..
time.Sleep(time.Second * 2)
resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQnCliCommand.result")
fh, err := os.Open(resultFile)
if err != nil {
t.Fatalf("error: failed open result file: %v\n", err)
}
result, err := io.ReadAll(fh)
if err != nil {
t.Fatalf("error: failed read result file: %v\n", err)
}
if strings.Contains(string(result), want) {
t.Fatalf("error: did not find expexted word `%v` in file ", want)
}
// ---------------------------------------
s.Stop() s.Stop()
// Shutdown services // Shutdown services.
ns.Shutdown() ns.Shutdown()
time.Sleep(time.Second * 5)
} }

View file

@ -522,7 +522,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
// Open file and write data. // Open file and write data.
file := filepath.Join(folderTree, fileName) file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, os.ModeAppend) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to open file : %v, message: %v", err, message) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to open file : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)