1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Merge branch 'dev' into main

This commit is contained in:
postmannen 2024-03-08 22:57:11 +01:00
commit 34e0c5d1c6
12 changed files with 96 additions and 59 deletions

View file

@ -1,5 +1,5 @@
# build stage
FROM golang:1.17.7-alpine AS build-env
FROM golang:1.22-alpine AS build-env
RUN apk --no-cache add build-base git gcc
RUN mkdir -p /build
@ -17,8 +17,6 @@ RUN apk update && apk add curl && apk add nmap
WORKDIR /app
COPY --from=build-env /build/cmd/ctrl/ctrl /app/
ENV RING_BUFFER_PERSIST_STORE "1"
ENV RING_BUFFER_SIZE "1000"
ENV CONFIG_FOLDER "./etc"
ENV SOCKET_FOLDER "./tmp"
ENV TCP_LISTENER ""
@ -41,6 +39,7 @@ ENV SUBSCRIBERS_DATA_FOLDER "./var"
ENV CENTRAL_NODE_NAME "central"
ENV ROOT_CA_PATH ""
ENV NKEY_SEED_FILE ""
ENV NKEY_SEED ""
ENV EXPOSE_DATA_FOLDER "127.0.0.1:8090"
ENV ERROR_MESSAGE_RETRIES 3
ENV ERROR_MESSAGE_TIMEOUT 10
@ -58,25 +57,21 @@ ENV START_PUB_REQ_HELLO 60
ENV ENABLE_KEY_UPDATES "1"
ENV ENABLE_ACL_UPDATES "1"
ENV IS_CENTRAL_ERROR_LOGGER ""
ENV START_SUB_REQ_HELLO ""
ENV START_SUB_REQ_TO_FILE_APPEND ""
ENV START_SUB_REQ_TO_FILE ""
ENV START_SUB_REQ_TO_FILE_NACK ""
ENV START_SUB_REQ_COPY_SRC ""
ENV START_SUB_REQ_COPY_DST ""
ENV START_SUB_REQ_PING ""
ENV START_SUB_REQ_PONG ""
ENV START_SUB_REQ_CLI_COMMAND ""
ENV START_SUB_REQ_TO_CONSOLE ""
ENV START_SUB_REQ_HTTP_GET ""
ENV START_SUB_REQ_HTTP_GET_SCHEDULED ""
ENV START_SUB_REQ_TAIL_FILE ""
ENV START_SUB_REQ_CLI_COMMAND_CONT ""
ENV IS_CENTRAL_ERROR_LOGGER "0"
ENV START_SUB_REQ_HELLO "1"
ENV START_SUB_REQ_TO_FILE_APPEND "1"
ENV START_SUB_REQ_TO_FILE "1"
ENV START_SUB_REQ_TO_FILE_NACK "1"
ENV START_SUB_REQ_COPY_SRC "1"
ENV START_SUB_REQ_COPY_DST "1"
ENV START_SUB_REQ_CLI_COMMAND "1"
ENV START_SUB_REQ_TO_CONSOLE "1"
ENV START_SUB_REQ_HTTP_GET "1"
ENV START_SUB_REQ_HTTP_GET_SCHEDULED "1"
ENV START_SUB_REQ_TAIL_FILE "1"
ENV START_SUB_REQ_CLI_COMMAND_CONT "1"
CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\
-ringBufferPersistStore=${RING_BUFFER_PERSIST_STORE}\
-ringBufferSize=${RING_BUFFER_SIZE}\
-socketFolder=${SOCKET_FOLDER}\
-tcpListener=${TCP_LISTENER}\
-httpListener=${HTTP_LISTENER}\
@ -98,6 +93,7 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\
-centralNodeName=${CENTRAL_NODE_NAME}\
-rootCAPath=${ROOT_CA_PATH}\
-nkeySeedFile=${NKEY_SEED_FILE}\
-nkeySeed=${NKEY_SEED}\
-exposeDataFolder=${EXPOSE_DATA_FOLDER}\
-errorMessageRetries=${ERROR_MESSAGE_RETRIES}\
-errorMessageTimeout=${ERROR_MESSAGE_TIMEOUT}\
@ -120,7 +116,6 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\
-startSubREQCopySrc=${START_SUB_REQ_COPY_SRC}\
-startSubREQCopyDst=${START_SUB_REQ_COPY_DST}\
-startSubREQToFileNACK=${START_SUB_REQ_TO_FILE_NACK}\
-startSubREQPong=${START_SUB_REQ_PONG}\
-startSubREQCliCommand=${START_SUB_REQ_CLI_COMMAND}\
-startSubREQToConsole=${START_SUB_REQ_TO_CONSOLE}\
-startSubREQHttpGet=${START_SUB_REQ_HTTP_GET}\

View file

@ -308,7 +308,7 @@ func (c *centralAuth) generateACLsForAllNodes() error {
// defer a.schemaMain.mu.Unlock()
enc := json.NewEncoder(fh)
enc.SetEscapeHTML(false)
enc.Encode(c.accessLists.schemaMain.ACLMap)
err = enc.Encode(c.accessLists.schemaMain.ACLMap)
if err != nil {
er := fmt.Errorf("error: generateACLsForAllNodes: encoding json to file failed: %v, err: %v", c.accessLists.schemaMain.ACLMapFilePath, err)
c.errorKernel.logError(er, c.configuration)

View file

@ -69,7 +69,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
p := pki{
// schema: make(map[Node]map[argsString]signatureBase32),
nodesAcked: newNodesAcked(),
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration),
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(),
configuration: configuration,
bucketNamePublicKeys: "publicKeys",
errorKernel: errorKernel,
@ -305,7 +305,7 @@ func (c *centralAuth) updateHash(proc process, message Message) {
c.pki.nodesAcked.keysAndHash.Hash = hash
// Store the key to the db for persistence.
c.pki.dbUpdateHash(hash[:])
err = c.pki.dbUpdateHash(hash[:])
if err != nil {
er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err)
c.pki.errorKernel.errSend(proc, message, er, logError)
@ -398,7 +398,7 @@ type nodeNotAckedPublicKeys struct {
}
// newNodeNotAckedPublicKeys will return a prepared type of nodePublicKeys.
func newNodeNotAckedPublicKeys(configuration *Configuration) *nodeNotAckedPublicKeys {
func newNodeNotAckedPublicKeys() *nodeNotAckedPublicKeys {
n := nodeNotAckedPublicKeys{
KeyMap: make(map[Node][]byte),
}

View file

@ -69,6 +69,8 @@ type Configuration struct {
NkeyPublicKey string `toml:"-"`
//
NkeyFromED25519SSHKeyFile string `comment:"Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified"`
// NkeySeed
NkeySeed string `toml:"-"`
// The host and port to expose the data folder, <host>:<port>
ExposeDataFolder string `comment:"The host and port to expose the data folder, <host>:<port>"`
// Timeout in seconds for error messages
@ -169,6 +171,7 @@ type ConfigurationFromFile struct {
RootCAPath *string
NkeySeedFile *string
NkeyFromED25519SSHKeyFile *string
NkeySeed *string
ExposeDataFolder *string
ErrorMessageTimeout *int
ErrorMessageRetries *int
@ -236,6 +239,7 @@ func newConfigurationDefaults() Configuration {
RootCAPath: "",
NkeySeedFile: "",
NkeyFromED25519SSHKeyFile: "",
NkeySeed: "",
ExposeDataFolder: "",
ErrorMessageTimeout: 60,
ErrorMessageRetries: 10,
@ -402,6 +406,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.NkeyFromED25519SSHKeyFile = *cf.NkeyFromED25519SSHKeyFile
}
if cf.NkeySeed == nil {
conf.NkeySeed = cd.NkeySeed
} else {
conf.NkeySeed = *cf.NkeySeed
}
if cf.ExposeDataFolder == nil {
conf.ExposeDataFolder = cd.ExposeDataFolder
} else {
@ -613,6 +622,7 @@ func (c *Configuration) CheckFlags(version string) error {
flag.StringVar(&c.RootCAPath, "rootCAPath", fc.RootCAPath, "If TLS, enter the path for where to find the root CA certificate")
flag.StringVar(&c.NkeyFromED25519SSHKeyFile, "nkeyFromED25519SSHKeyFile", fc.NkeyFromED25519SSHKeyFile, "The full path of the nkeys seed file")
flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", fc.NkeySeedFile, "Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified")
flag.StringVar(&c.NkeySeed, "nkeySeed", fc.NkeySeed, "The actual nkey seed. To use if not stored in file")
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out")
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it")

2
go.mod
View file

@ -1,6 +1,6 @@
module github.com/postmannen/ctrl
go 1.21
go 1.22
require (
github.com/fsnotify/fsnotify v1.6.0

View file

@ -154,7 +154,7 @@ func (n *nodeAcl) saveToFile() error {
enc := json.NewEncoder(fh)
enc.SetEscapeHTML(false)
enc.Encode(n.aclAndHash)
err = enc.Encode(n.aclAndHash)
// HERE
// b, err := json.Marshal(n.aclAndHash)

View file

@ -125,7 +125,7 @@ type process struct {
// prepareNewProcess will set the the provided values and the default
// values for a process.
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
// create the initial configuration for a sessions communicating with 1 host process.
server.processes.mu.Lock()
server.processes.lastProcessID++

View file

@ -390,7 +390,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
}
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
proc.procFunc = pf
go proc.spawnWorker()
@ -400,7 +400,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
p.errorKernel.logDebug(er, p.configuration)
sub := newSubject(m, string(p.node))
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher, nil)
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher)
proc.procFunc = pf
proc.isLongRunningPublisher = true

View file

@ -225,14 +225,14 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error
// Create a new sub process that will do the actual file copying.
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message)
// assign a handler to the sub process
copySrcSubProc.handler = copySrcSubHandler(cia)
copySrcSubProc.handler = copySrcSubHandler()
// The process will be killed when the context expires.
go copySrcSubProc.spawnWorker()
@ -280,8 +280,8 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error
}
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
p := newProcess(ctx, server, subject, processKind, procFunc)
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
p := newProcess(ctx, server, subject, processKind)
p.isSubProcess = true
return p
@ -352,14 +352,14 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error
}
// Create a new sub process that will do the actual file copying.
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel)
// assign a handler to the sub process
copyDstSubProc.handler = copyDstSubHandler(cia)
copyDstSubProc.handler = copyDstSubHandler()
// The process will be killed when the context expires.
go copyDstSubProc.spawnWorker()
@ -375,7 +375,7 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error
return ackMsg, nil
}
func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
func copySrcSubHandler() func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
// We should receive a ready message generated by the procFunc of Dst,
@ -396,7 +396,7 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by
return h
}
func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
func copyDstSubHandler() func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
select {

View file

@ -68,7 +68,7 @@ func methodREQOpProcessStart(proc process, message Message, node string) ([]byte
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
go procNew.spawnWorker()
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)

View file

@ -322,7 +322,7 @@ func TestRequest(t *testing.T) {
case fileContains:
resultFile := filepath.Join(tstConf.SubscribersDataFolder, tt.message.Directory, string(tt.message.FromNode), tt.message.FileName)
found, err := findStringInFileTest(string(tt.want), resultFile, tstConf, t)
found, err := findStringInFileTest(string(tt.want), resultFile)
if err != nil || found == false {
t.Fatalf(" \U0001F631 [FAILED] : %v: %v\n", tt.info, err)
@ -334,14 +334,14 @@ func TestRequest(t *testing.T) {
// --- Other REQ tests that does not fit well into the general table above.
checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir)
checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir)
checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir)
checkREQCopySrc(tstSrv, tstConf, t, tstTempDir)
checkREQTailFileTest(tstConf, t, tstTempDir)
checkMetricValuesTest(tstSrv, t)
checkErrorKernelMalformedJSONtest(tstConf, t)
checkREQCopySrc(tstConf, t, tstTempDir)
}
// Check the tailing of files type.
func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error {
func checkREQTailFileTest(conf *Configuration, t *testing.T, tmpDir string) error {
// Create a file with some content.
fp := filepath.Join(tmpDir, "test.file")
fh, err := os.OpenFile(fp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
@ -407,7 +407,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T,
cancel()
_, err = findStringInFileTest("some file content", resultFile, conf, t)
_, err = findStringInFileTest("some file content", resultFile)
if err != nil {
return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: %v", err)
}
@ -417,7 +417,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T,
}
// Check the file copier.
func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error {
func checkREQCopySrc(conf *Configuration, t *testing.T, tmpDir string) error {
testFiles := 5
for i := 1; i <= testFiles; i++ {
@ -474,7 +474,7 @@ func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpD
return nil
}
func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error {
func checkMetricValuesTest(ctrlServer *server, t *testing.T) error {
mfs, err := ctrlServer.metrics.promRegistry.Gather()
if err != nil {
return fmt.Errorf("error: promRegistry.gathering: %v", mfs)
@ -507,7 +507,7 @@ func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T
}
// Check errorKernel
func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error {
func checkErrorKernelMalformedJSONtest(conf *Configuration, t *testing.T) error {
// JSON message with error, missing brace.
m := `[
@ -550,7 +550,7 @@ func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration,
case <-chUpdated:
// We got an update, so we continue to check if we find the string we're
// looking for.
found, err := findStringInFileTest("error: malformed json", resultFile, conf, t)
found, err := findStringInFileTest("error: malformed json", resultFile)
if !found && err != nil {
return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: %v", err)
}
@ -605,7 +605,7 @@ func checkFileUpdated(fileRealPath string, fileUpdated chan bool) {
}
// Check if a file contains the given string.
func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) {
func findStringInFileTest(want string, fileName string) (bool, error) {
// Wait n seconds for the results file to be created
n := 50

View file

@ -97,7 +97,39 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
opt = nats.RootCAs(configuration.RootCAPath)
}
if configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "" {
switch {
case configuration.NkeySeed != "":
cwd, err := os.Getwd()
if err != nil {
cancel()
return nil, fmt.Errorf("error: failed to get current working directory when creating tmp seed file: %v", err)
}
pth := filepath.Join(cwd, "seed.txt")
// f, err := os.CreateTemp(pth, "")
// if err != nil {
// return nil, fmt.Errorf("error: failed to create tmp seed file: %v", err)
// }
err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0700)
if err != nil {
cancel()
return nil, fmt.Errorf("error: failed to write temp seed file: %v", err)
}
opt, err = nats.NkeyOptionFromSeed(pth)
if err != nil {
cancel()
return nil, fmt.Errorf("error: failed to read temp nkey seed file: %v", err)
}
err = os.Remove(pth)
if err != nil {
cancel()
return nil, fmt.Errorf("error: failed to remove temp seed file: %v", err)
}
case configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "":
var err error
opt, err = nats.NkeyOptionFromSeed(configuration.NkeySeedFile)
@ -105,9 +137,8 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
cancel()
return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err)
}
}
if configuration.NkeyFromED25519SSHKeyFile != "" {
case configuration.NkeyFromED25519SSHKeyFile != "":
var err error
opt, err = configuration.nkeyOptFromSSHKey()
@ -115,6 +146,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
cancel()
return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err)
}
}
var conn *nats.Conn
@ -302,7 +334,7 @@ func (s *server) Start() {
//
// NB: The context of the initial process are set in processes.Start.
sub := newSubject(REQInitial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s, sub, "", nil)
s.processInitial = newProcess(context.TODO(), s, sub, "")
// Start all wanted subscriber processes.
s.processes.Start(s.processInitial)
@ -312,7 +344,7 @@ func (s *server) Start() {
// Start exposing the the data folder via HTTP if flag is set.
if s.configuration.ExposeDataFolder != "" {
log.Printf("info: Starting expose of data folder via HTTP\n")
go s.exposeDataFolder(s.ctx)
go s.exposeDataFolder()
}
// Start the processing of new messages from an input channel.
@ -539,9 +571,9 @@ func (s *server) routeMessagesToProcess() {
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
proc = newProcess(s.ctx, s, sub, processKindPublisher)
}
proc.spawnWorker()
@ -565,7 +597,7 @@ func (s *server) routeMessagesToProcess() {
}()
}
func (s *server) exposeDataFolder(ctx context.Context) {
func (s *server) exposeDataFolder() {
fileHandler := func(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Content-Type", "text/html")
http.FileServer(http.Dir(s.configuration.SubscribersDataFolder)).ServeHTTP(w, r)