diff --git a/Dockerfile b/Dockerfile index 5134c5b..6b1955c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # build stage -FROM golang:1.17.7-alpine AS build-env +FROM golang:1.22-alpine AS build-env RUN apk --no-cache add build-base git gcc RUN mkdir -p /build @@ -17,8 +17,6 @@ RUN apk update && apk add curl && apk add nmap WORKDIR /app COPY --from=build-env /build/cmd/ctrl/ctrl /app/ -ENV RING_BUFFER_PERSIST_STORE "1" -ENV RING_BUFFER_SIZE "1000" ENV CONFIG_FOLDER "./etc" ENV SOCKET_FOLDER "./tmp" ENV TCP_LISTENER "" @@ -41,6 +39,7 @@ ENV SUBSCRIBERS_DATA_FOLDER "./var" ENV CENTRAL_NODE_NAME "central" ENV ROOT_CA_PATH "" ENV NKEY_SEED_FILE "" +ENV NKEY_SEED "" ENV EXPOSE_DATA_FOLDER "127.0.0.1:8090" ENV ERROR_MESSAGE_RETRIES 3 ENV ERROR_MESSAGE_TIMEOUT 10 @@ -58,25 +57,21 @@ ENV START_PUB_REQ_HELLO 60 ENV ENABLE_KEY_UPDATES "1" ENV ENABLE_ACL_UPDATES "1" -ENV IS_CENTRAL_ERROR_LOGGER "" -ENV START_SUB_REQ_HELLO "" -ENV START_SUB_REQ_TO_FILE_APPEND "" -ENV START_SUB_REQ_TO_FILE "" -ENV START_SUB_REQ_TO_FILE_NACK "" -ENV START_SUB_REQ_COPY_SRC "" -ENV START_SUB_REQ_COPY_DST "" -ENV START_SUB_REQ_PING "" -ENV START_SUB_REQ_PONG "" -ENV START_SUB_REQ_CLI_COMMAND "" -ENV START_SUB_REQ_TO_CONSOLE "" -ENV START_SUB_REQ_HTTP_GET "" -ENV START_SUB_REQ_HTTP_GET_SCHEDULED "" -ENV START_SUB_REQ_TAIL_FILE "" -ENV START_SUB_REQ_CLI_COMMAND_CONT "" +ENV IS_CENTRAL_ERROR_LOGGER "0" +ENV START_SUB_REQ_HELLO "1" +ENV START_SUB_REQ_TO_FILE_APPEND "1" +ENV START_SUB_REQ_TO_FILE "1" +ENV START_SUB_REQ_TO_FILE_NACK "1" +ENV START_SUB_REQ_COPY_SRC "1" +ENV START_SUB_REQ_COPY_DST "1" +ENV START_SUB_REQ_CLI_COMMAND "1" +ENV START_SUB_REQ_TO_CONSOLE "1" +ENV START_SUB_REQ_HTTP_GET "1" +ENV START_SUB_REQ_HTTP_GET_SCHEDULED "1" +ENV START_SUB_REQ_TAIL_FILE "1" +ENV START_SUB_REQ_CLI_COMMAND_CONT "1" CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\ - -ringBufferPersistStore=${RING_BUFFER_PERSIST_STORE}\ - -ringBufferSize=${RING_BUFFER_SIZE}\ -socketFolder=${SOCKET_FOLDER}\ -tcpListener=${TCP_LISTENER}\ -httpListener=${HTTP_LISTENER}\ @@ -98,6 +93,7 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\ -centralNodeName=${CENTRAL_NODE_NAME}\ -rootCAPath=${ROOT_CA_PATH}\ -nkeySeedFile=${NKEY_SEED_FILE}\ + -nkeySeed=${NKEY_SEED}\ -exposeDataFolder=${EXPOSE_DATA_FOLDER}\ -errorMessageRetries=${ERROR_MESSAGE_RETRIES}\ -errorMessageTimeout=${ERROR_MESSAGE_TIMEOUT}\ @@ -120,7 +116,6 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\ -startSubREQCopySrc=${START_SUB_REQ_COPY_SRC}\ -startSubREQCopyDst=${START_SUB_REQ_COPY_DST}\ -startSubREQToFileNACK=${START_SUB_REQ_TO_FILE_NACK}\ - -startSubREQPong=${START_SUB_REQ_PONG}\ -startSubREQCliCommand=${START_SUB_REQ_CLI_COMMAND}\ -startSubREQToConsole=${START_SUB_REQ_TO_CONSOLE}\ -startSubREQHttpGet=${START_SUB_REQ_HTTP_GET}\ diff --git a/central_auth_acl_handling.go b/central_auth_acl_handling.go index 55f129c..407672e 100644 --- a/central_auth_acl_handling.go +++ b/central_auth_acl_handling.go @@ -308,7 +308,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { // defer a.schemaMain.mu.Unlock() enc := json.NewEncoder(fh) enc.SetEscapeHTML(false) - enc.Encode(c.accessLists.schemaMain.ACLMap) + err = enc.Encode(c.accessLists.schemaMain.ACLMap) if err != nil { er := fmt.Errorf("error: generateACLsForAllNodes: encoding json to file failed: %v, err: %v", c.accessLists.schemaMain.ACLMapFilePath, err) c.errorKernel.logError(er, c.configuration) diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index 2f2f74a..982c21f 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -69,7 +69,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { p := pki{ // schema: make(map[Node]map[argsString]signatureBase32), nodesAcked: newNodesAcked(), - nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration), + nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(), configuration: configuration, bucketNamePublicKeys: "publicKeys", errorKernel: errorKernel, @@ -305,7 +305,7 @@ func (c *centralAuth) updateHash(proc process, message Message) { c.pki.nodesAcked.keysAndHash.Hash = hash // Store the key to the db for persistence. - c.pki.dbUpdateHash(hash[:]) + err = c.pki.dbUpdateHash(hash[:]) if err != nil { er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err) c.pki.errorKernel.errSend(proc, message, er, logError) @@ -398,7 +398,7 @@ type nodeNotAckedPublicKeys struct { } // newNodeNotAckedPublicKeys will return a prepared type of nodePublicKeys. -func newNodeNotAckedPublicKeys(configuration *Configuration) *nodeNotAckedPublicKeys { +func newNodeNotAckedPublicKeys() *nodeNotAckedPublicKeys { n := nodeNotAckedPublicKeys{ KeyMap: make(map[Node][]byte), } diff --git a/configuration_flags.go b/configuration_flags.go index dd004c0..a5c0a4b 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -69,6 +69,8 @@ type Configuration struct { NkeyPublicKey string `toml:"-"` // NkeyFromED25519SSHKeyFile string `comment:"Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified"` + // NkeySeed + NkeySeed string `toml:"-"` // The host and port to expose the data folder, : ExposeDataFolder string `comment:"The host and port to expose the data folder, :"` // Timeout in seconds for error messages @@ -169,6 +171,7 @@ type ConfigurationFromFile struct { RootCAPath *string NkeySeedFile *string NkeyFromED25519SSHKeyFile *string + NkeySeed *string ExposeDataFolder *string ErrorMessageTimeout *int ErrorMessageRetries *int @@ -236,6 +239,7 @@ func newConfigurationDefaults() Configuration { RootCAPath: "", NkeySeedFile: "", NkeyFromED25519SSHKeyFile: "", + NkeySeed: "", ExposeDataFolder: "", ErrorMessageTimeout: 60, ErrorMessageRetries: 10, @@ -402,6 +406,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.NkeyFromED25519SSHKeyFile = *cf.NkeyFromED25519SSHKeyFile } + if cf.NkeySeed == nil { + conf.NkeySeed = cd.NkeySeed + } else { + conf.NkeySeed = *cf.NkeySeed + } if cf.ExposeDataFolder == nil { conf.ExposeDataFolder = cd.ExposeDataFolder } else { @@ -613,6 +622,7 @@ func (c *Configuration) CheckFlags(version string) error { flag.StringVar(&c.RootCAPath, "rootCAPath", fc.RootCAPath, "If TLS, enter the path for where to find the root CA certificate") flag.StringVar(&c.NkeyFromED25519SSHKeyFile, "nkeyFromED25519SSHKeyFile", fc.NkeyFromED25519SSHKeyFile, "The full path of the nkeys seed file") flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", fc.NkeySeedFile, "Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified") + flag.StringVar(&c.NkeySeed, "nkeySeed", fc.NkeySeed, "The actual nkey seed. To use if not stored in file") flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it") diff --git a/go.mod b/go.mod index 7bcac33..30a3ae9 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/postmannen/ctrl -go 1.21 +go 1.22 require ( github.com/fsnotify/fsnotify v1.6.0 diff --git a/node_auth.go b/node_auth.go index 7a0a56c..d2644d2 100644 --- a/node_auth.go +++ b/node_auth.go @@ -154,7 +154,7 @@ func (n *nodeAcl) saveToFile() error { enc := json.NewEncoder(fh) enc.SetEscapeHTML(false) - enc.Encode(n.aclAndHash) + err = enc.Encode(n.aclAndHash) // HERE // b, err := json.Marshal(n.aclAndHash) diff --git a/process.go b/process.go index 39e38dc..811f141 100644 --- a/process.go +++ b/process.go @@ -125,7 +125,7 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process { +func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { // create the initial configuration for a sessions communicating with 1 host process. server.processes.mu.Lock() server.processes.lastProcessID++ diff --git a/processes.go b/processes.go index cb6464f..b4ac3eb 100644 --- a/processes.go +++ b/processes.go @@ -390,7 +390,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p } fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) - proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil) + proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) proc.procFunc = pf go proc.spawnWorker() @@ -400,7 +400,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr er := fmt.Errorf("starting %v publisher: %#v", m, p.node) p.errorKernel.logDebug(er, p.configuration) sub := newSubject(m, string(p.node)) - proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher, nil) + proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) proc.procFunc = pf proc.isLongRunningPublisher = true diff --git a/requests_copy.go b/requests_copy.go index 791d607..3d43a8b 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -225,14 +225,14 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error // Create a new sub process that will do the actual file copying. - copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil) + copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message) // assign a handler to the sub process - copySrcSubProc.handler = copySrcSubHandler(cia) + copySrcSubProc.handler = copySrcSubHandler() // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() @@ -280,8 +280,8 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error } // newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. -func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process { - p := newProcess(ctx, server, subject, processKind, procFunc) +func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process { + p := newProcess(ctx, server, subject, processKind) p.isSubProcess = true return p @@ -352,14 +352,14 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error } // Create a new sub process that will do the actual file copying. - copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil) + copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel) // assign a handler to the sub process - copyDstSubProc.handler = copyDstSubHandler(cia) + copyDstSubProc.handler = copyDstSubHandler() // The process will be killed when the context expires. go copyDstSubProc.spawnWorker() @@ -375,7 +375,7 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error return ackMsg, nil } -func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { +func copySrcSubHandler() func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { // We should receive a ready message generated by the procFunc of Dst, @@ -396,7 +396,7 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by return h } -func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { +func copyDstSubHandler() func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { select { diff --git a/requests_operator.go b/requests_operator.go index dd0db47..12fd5f5 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -68,7 +68,7 @@ func methodREQOpProcessStart(proc process, message Message, node string) ([]byte // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil) + procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) go procNew.spawnWorker() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) diff --git a/requests_test.go b/requests_test.go index 157473b..db3a1c5 100644 --- a/requests_test.go +++ b/requests_test.go @@ -322,7 +322,7 @@ func TestRequest(t *testing.T) { case fileContains: resultFile := filepath.Join(tstConf.SubscribersDataFolder, tt.message.Directory, string(tt.message.FromNode), tt.message.FileName) - found, err := findStringInFileTest(string(tt.want), resultFile, tstConf, t) + found, err := findStringInFileTest(string(tt.want), resultFile) if err != nil || found == false { t.Fatalf(" \U0001F631 [FAILED] : %v: %v\n", tt.info, err) @@ -334,14 +334,14 @@ func TestRequest(t *testing.T) { // --- Other REQ tests that does not fit well into the general table above. - checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir) - checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir) - checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir) - checkREQCopySrc(tstSrv, tstConf, t, tstTempDir) + checkREQTailFileTest(tstConf, t, tstTempDir) + checkMetricValuesTest(tstSrv, t) + checkErrorKernelMalformedJSONtest(tstConf, t) + checkREQCopySrc(tstConf, t, tstTempDir) } // Check the tailing of files type. -func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error { +func checkREQTailFileTest(conf *Configuration, t *testing.T, tmpDir string) error { // Create a file with some content. fp := filepath.Join(tmpDir, "test.file") fh, err := os.OpenFile(fp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660) @@ -407,7 +407,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T, cancel() - _, err = findStringInFileTest("some file content", resultFile, conf, t) + _, err = findStringInFileTest("some file content", resultFile) if err != nil { return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: %v", err) } @@ -417,7 +417,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T, } // Check the file copier. -func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error { +func checkREQCopySrc(conf *Configuration, t *testing.T, tmpDir string) error { testFiles := 5 for i := 1; i <= testFiles; i++ { @@ -474,7 +474,7 @@ func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpD return nil } -func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error { +func checkMetricValuesTest(ctrlServer *server, t *testing.T) error { mfs, err := ctrlServer.metrics.promRegistry.Gather() if err != nil { return fmt.Errorf("error: promRegistry.gathering: %v", mfs) @@ -507,7 +507,7 @@ func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T } // Check errorKernel -func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error { +func checkErrorKernelMalformedJSONtest(conf *Configuration, t *testing.T) error { // JSON message with error, missing brace. m := `[ @@ -550,7 +550,7 @@ func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration, case <-chUpdated: // We got an update, so we continue to check if we find the string we're // looking for. - found, err := findStringInFileTest("error: malformed json", resultFile, conf, t) + found, err := findStringInFileTest("error: malformed json", resultFile) if !found && err != nil { return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: %v", err) } @@ -605,7 +605,7 @@ func checkFileUpdated(fileRealPath string, fileUpdated chan bool) { } // Check if a file contains the given string. -func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) { +func findStringInFileTest(want string, fileName string) (bool, error) { // Wait n seconds for the results file to be created n := 50 diff --git a/server.go b/server.go index 9f69c04..fd32b32 100644 --- a/server.go +++ b/server.go @@ -97,7 +97,39 @@ func NewServer(configuration *Configuration, version string) (*server, error) { opt = nats.RootCAs(configuration.RootCAPath) } - if configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "" { + switch { + case configuration.NkeySeed != "": + cwd, err := os.Getwd() + if err != nil { + cancel() + return nil, fmt.Errorf("error: failed to get current working directory when creating tmp seed file: %v", err) + } + + pth := filepath.Join(cwd, "seed.txt") + + // f, err := os.CreateTemp(pth, "") + // if err != nil { + // return nil, fmt.Errorf("error: failed to create tmp seed file: %v", err) + // } + + err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0700) + if err != nil { + cancel() + return nil, fmt.Errorf("error: failed to write temp seed file: %v", err) + } + + opt, err = nats.NkeyOptionFromSeed(pth) + if err != nil { + cancel() + return nil, fmt.Errorf("error: failed to read temp nkey seed file: %v", err) + } + err = os.Remove(pth) + if err != nil { + cancel() + return nil, fmt.Errorf("error: failed to remove temp seed file: %v", err) + } + + case configuration.NkeySeedFile != "" && configuration.NkeyFromED25519SSHKeyFile == "": var err error opt, err = nats.NkeyOptionFromSeed(configuration.NkeySeedFile) @@ -105,9 +137,8 @@ func NewServer(configuration *Configuration, version string) (*server, error) { cancel() return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err) } - } - if configuration.NkeyFromED25519SSHKeyFile != "" { + case configuration.NkeyFromED25519SSHKeyFile != "": var err error opt, err = configuration.nkeyOptFromSSHKey() @@ -115,6 +146,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { cancel() return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err) } + } var conn *nats.Conn @@ -302,7 +334,7 @@ func (s *server) Start() { // // NB: The context of the initial process are set in processes.Start. sub := newSubject(REQInitial, s.nodeName) - s.processInitial = newProcess(context.TODO(), s, sub, "", nil) + s.processInitial = newProcess(context.TODO(), s, sub, "") // Start all wanted subscriber processes. s.processes.Start(s.processInitial) @@ -312,7 +344,7 @@ func (s *server) Start() { // Start exposing the the data folder via HTTP if flag is set. if s.configuration.ExposeDataFolder != "" { log.Printf("info: Starting expose of data folder via HTTP\n") - go s.exposeDataFolder(s.ctx) + go s.exposeDataFolder() } // Start the processing of new messages from an input channel. @@ -539,9 +571,9 @@ func (s *server) routeMessagesToProcess() { var proc process switch { case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil) + proc = newSubProcess(s.ctx, s, sub, processKindPublisher) default: - proc = newProcess(s.ctx, s, sub, processKindPublisher, nil) + proc = newProcess(s.ctx, s, sub, processKindPublisher) } proc.spawnWorker() @@ -565,7 +597,7 @@ func (s *server) routeMessagesToProcess() { }() } -func (s *server) exposeDataFolder(ctx context.Context) { +func (s *server) exposeDataFolder() { fileHandler := func(w http.ResponseWriter, r *http.Request) { // w.Header().Set("Content-Type", "text/html") http.FileServer(http.Dir(s.configuration.SubscribersDataFolder)).ServeHTTP(w, r)