From 2fb43591cef377780b4c5514a4bafe3b34f63285 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 27 Mar 2024 12:48:17 +0100 Subject: [PATCH] updated readme removed example for no longer existing relay messages cleaned up comments Removed some remaings after REQToFileNACK Implemented env variables for all flags, and removed config flag. Also added use of .env file. removed configuration as input argument from all the loggers replaced logging of new messages in read folder with a logDebug so we don't send those messages to the error kernel --- Dockerfile | 107 +-- README.md | 107 +-- central_auth_acl_handling.go | 50 +- central_auth_key_handling.go | 18 +- cmd/ctrl/main.go | 10 +- configuration_flags.go | 633 +++--------------- doc/nats-server-relay-configuration/README.md | 49 -- errorkernel.go | 22 +- go.mod | 2 +- go.sum | 5 +- message_and_subject.go | 8 - message_readers.go | 20 +- node_auth.go | 32 +- process.go | 82 +-- processes.go | 26 +- requests.go | 7 - requests_acl.go | 62 +- requests_cli.go | 8 +- requests_copy.go | 32 +- requests_file_handling.go | 6 +- requests_http.go | 10 +- requests_keys.go | 32 +- requests_std.go | 4 +- scripts/mini-steward/test.sh | 2 +- server.go | 20 +- 25 files changed, 342 insertions(+), 1012 deletions(-) delete mode 100644 doc/nats-server-relay-configuration/README.md diff --git a/Dockerfile b/Dockerfile index 6b1955c..e8ada15 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,109 +17,4 @@ RUN apk update && apk add curl && apk add nmap WORKDIR /app COPY --from=build-env /build/cmd/ctrl/ctrl /app/ -ENV CONFIG_FOLDER "./etc" -ENV SOCKET_FOLDER "./tmp" -ENV TCP_LISTENER "" -ENV HTTP_LISTENER "localhost:8091" -ENV DATABASE_FOLDER "./var/lib" -ENV NODE_NAME "" -ENV BROKER_ADDRESS "127.0.0.1:4222" -ENV NATS_CONN_OPT_TIMEOUT "20" -ENV NATS_CONNECT_RETRY_INTERVAL "10" -ENV NATS_RECONNECT_JITTER "100" -ENV NATS_RECONNECT_JITTER_TLS "1" -ENV REQ_KEYS_REQUEST_UPDATE_INTERVAL "60" -ENV REQ_ACL_REQUEST_UPDATE_INTERVAL "60" -ENV PROFILING_PORT "" -ENV PROM_HOST_AND_PORT "127.0.0.1:2111" -ENV DEFAULT_MESSAGE_TIMEOUT 10 -ENV DEFAULT_MESSAGE_RETRIES 3 -ENV DEFAULT_METHOD_TIMEOUT 10 -ENV SUBSCRIBERS_DATA_FOLDER "./var" -ENV CENTRAL_NODE_NAME "central" -ENV ROOT_CA_PATH "" -ENV NKEY_SEED_FILE "" -ENV NKEY_SEED "" -ENV EXPOSE_DATA_FOLDER "127.0.0.1:8090" -ENV ERROR_MESSAGE_RETRIES 3 -ENV ERROR_MESSAGE_TIMEOUT 10 -ENV COMPRESSION "" -ENV SERIALIZATION "" -ENV SET_BLOCK_PROFILE_RATE "0" -ENV ENABLE_SOCKET "1" -ENV ENABLE_SIGNATURE_CHECK "0" -ENV ENABLE_ACL_CHECK "0" -ENV IS_CENTRAL_AUTH "0" -ENV ENABLE_DEBUG "0" -ENV KEEP_PUBLISHERS_ALIVE_FOR "10" - -ENV START_PUB_REQ_HELLO 60 - -ENV ENABLE_KEY_UPDATES "1" -ENV ENABLE_ACL_UPDATES "1" -ENV IS_CENTRAL_ERROR_LOGGER "0" -ENV START_SUB_REQ_HELLO "1" -ENV START_SUB_REQ_TO_FILE_APPEND "1" -ENV START_SUB_REQ_TO_FILE "1" -ENV START_SUB_REQ_TO_FILE_NACK "1" -ENV START_SUB_REQ_COPY_SRC "1" -ENV START_SUB_REQ_COPY_DST "1" -ENV START_SUB_REQ_CLI_COMMAND "1" -ENV START_SUB_REQ_TO_CONSOLE "1" -ENV START_SUB_REQ_HTTP_GET "1" -ENV START_SUB_REQ_HTTP_GET_SCHEDULED "1" -ENV START_SUB_REQ_TAIL_FILE "1" -ENV START_SUB_REQ_CLI_COMMAND_CONT "1" - -CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/ctrl\ - -socketFolder=${SOCKET_FOLDER}\ - -tcpListener=${TCP_LISTENER}\ - -httpListener=${HTTP_LISTENER}\ - -databaseFolder=${DATABASE_FOLDER}\ - -nodeName=${NODE_NAME}\ - -brokerAddress=${BROKER_ADDRESS}\ - -natsConnOptTimeout=${NATS_CONN_OPT_TIMEOUT}\ - -natsConnectRetryInterval=${NATS_CONNECT_RETRY_INTERVAL}\ - -natsReconnectJitter=${NATS_RECONNECT_JITTER}\ - -natsReconnectJitterTLS=${NATS_RECONNECT_JITTER_TLS}\ - -REQKeysRequestUpdateInterval=${REQ_KEYS_REQUEST_UPDATE_INTERVAL}\ - -REQAclRequestUpdateInterval=${REQ_ACL_REQUEST_UPDATE_INTERVAL}\ - -profilingPort=${PROFILING_PORT}\ - -promHostAndPort=${PROM_HOST_AND_PORT}\ - -defaultMessageTimeout=${DEFAULT_MESSAGE_TIMEOUT}\ - -defaultMessageRetries=${DEFAULT_MESSAGE_RETRIES}\ - -defaultMethodTimeout=${DEFAULT_METHOD_TIMEOUT}\ - -subscribersDataFolder=${SUBSCRIBERS_DATA_FOLDER}\ - -centralNodeName=${CENTRAL_NODE_NAME}\ - -rootCAPath=${ROOT_CA_PATH}\ - -nkeySeedFile=${NKEY_SEED_FILE}\ - -nkeySeed=${NKEY_SEED}\ - -exposeDataFolder=${EXPOSE_DATA_FOLDER}\ - -errorMessageRetries=${ERROR_MESSAGE_RETRIES}\ - -errorMessageTimeout=${ERROR_MESSAGE_TIMEOUT}\ - -compression=${COMPRESSION}\ - -serialization=${SERIALIZATION}\ - -setBlockProfileRate=${SET_BLOCK_PROFILE_RATE}\ - -enableSocket=${ENABLE_SOCKET}\ - -enableSignatureCheck=${ENABLE_SIGNATURE_CHECK}\ - -enableAclCheck=${ENABLE_ACL_CHECK}\ - -isCentralAuth=${IS_CENTRAL_AUTH}\ - -enableDebug=${ENABLE_DEBUG}\ - -keepPublishersAliveFor=${KEEP_PUBLISHERS_ALIVE_FOR}\ - -startPubREQHello=${START_PUB_REQ_HELLO}\ - -EnableKeyUpdates=${ENABLE_KEY_UPDATES}\ - -EnableAclUpdates=${ENABLE_ACL_UPDATES}\ - -isCentralErrorLogger=${IS_CENTRAL_ERROR_LOGGER}\ - -startSubREQHello=${START_SUB_REQ_HELLO}\ - -startSubREQToFileAppend=${START_SUB_REQ_TO_FILE_APPEND}\ - -startSubREQToFile=${START_SUB_REQ_TO_FILE}\ - -startSubREQCopySrc=${START_SUB_REQ_COPY_SRC}\ - -startSubREQCopyDst=${START_SUB_REQ_COPY_DST}\ - -startSubREQToFileNACK=${START_SUB_REQ_TO_FILE_NACK}\ - -startSubREQCliCommand=${START_SUB_REQ_CLI_COMMAND}\ - -startSubREQToConsole=${START_SUB_REQ_TO_CONSOLE}\ - -startSubREQHttpGet=${START_SUB_REQ_HTTP_GET}\ - -startSubREQHttpGetScheduled=${START_SUB_REQ_HTTP_GET_SCHEDULED}\ - -startSubREQTailFile=${START_SUB_REQ_TAIL_FILE}\ - -startSubREQCliCommandCont=${START_SUB_REQ_CLI_COMMAND_CONT}\ - "] +CMD ["ash","-c","/app/ctrl"] diff --git a/README.md b/README.md index 5ac2535..7625cda 100644 --- a/README.md +++ b/README.md @@ -20,16 +20,14 @@ As long as you can do something as an operator on in a shell on a system you can - [Ctrl](#ctrl) - [Intro](#intro) - [Example](#example) - - [Disclaimer](#disclaimer) - [Overview](#overview) - [Example of message flow](#example-of-message-flow) - [Inspiration](#inspiration) - - [Why](#why) + - [Why ctrl was created](#why-ctrl-was-created) - [Publishing and Subscribing processes](#publishing-and-subscribing-processes) - [Publisher](#publisher) - [Subscriber](#subscriber) - [Load balancing](#load-balancing) - - [Logical structure](#logical-structure) - [Terminology](#terminology) - [Features](#features) - [Input methods](#input-methods) @@ -57,7 +55,6 @@ As long as you can do something as an operator on in a shell on a system you can - [REQCliCommandCont](#reqclicommandcont) - [REQTailFile](#reqtailfile) - [REQHttpGet](#reqhttpget) - - [REQHttpGetScheduled](#reqhttpgetscheduled) - [REQHello](#reqhello) - [REQCopySrc](#reqcopysrc) - [REQErrorLog](#reqerrorlog) @@ -117,6 +114,7 @@ As long as you can do something as an operator on in a shell on a system you can - [Subject](#subject) - [Complete subject example](#complete-subject-example) - [History](#history) + - [Disclaimer](#disclaimer) ## Example @@ -144,23 +142,15 @@ If the receiver `toNode` is down when the message was sent, it will be **retried Since the initial connection from a ctrl node is outbound towards the central NATS message broker no inbound firewall openings are needed. -## Disclaimer - -All code in this repository are to be concidered not-production-ready, and the use is at your own responsibility and risk. The code are the attempt to concretize the idea of a purely async management system where the controlling unit is decoupled from the receiving unit, and that that we know the state of all the receiving units at all times. - -Also read the license file for further details. - -Expect the main branch to have breaking changes. If stability is needed, use the released packages, and read the release notes where changes will be explained. - ## Overview -Send Commands with Request Methods to control your servers by passing a messages that will have guaranteed delivery based on the criteries set, and when/if the receiving node is available. The result of the method executed will be delivered back to you from the node you sent it from. +Send Commands with Request Methods to control your servers by passing a messages. If a receiving node is down, the message will be retried with the criterias set within the message body. The result of the method executed will be delivered back to you from the node you sent it from. -ctrl uses **NATS** as message passing architecture for the commands back and forth from nodes. Delivery is guaranteed within the criterias set. All of the processes in the system are running concurrently, so if something breaks or some process is slow it will not affect the handling and delivery of the other messages in the system. +ctrl uses **NATS** as message passing architecture for the commands back and forth from nodes. Delivery is guaranteed within the criterias set. All of the processes in the system are running concurrently. If some process is slow or fails it will not affect the handling and delivery of the other messages in the system. -A node can be a server running any host operating system, a container living in the cloud somewhere, a Rapsberry Pi, or something else that needs to be controlled that have an operating system installed. +**ctrl** can be run on almost any host operating system, containers living in the cloud somewhere, a Rapsberry Pi, or something else that needs to be controlled that have an operating system installed. -ctrl can be compiled to run on all major architectures like **x86**, **amd64**,**arm64**, **ppc64** and more, with for example operating systems like **Linux**, **OSX**, **Windows**. +ctrl can be compiled to run on most major architectures like **x86**, **amd64**,**arm64**, **ppc64** and more, with for example operating systems like **Linux**, **OSX**, **Windows**. ### Example of message flow @@ -176,7 +166,7 @@ I used those ideas as inspiration for building a fully concurrent system to cont ctrl is written in the programming language Go with NATS as the message broker. -## Why +## Why ctrl was created With existing solutions there is often either a push or a pull kind of setup to control the nodes. @@ -194,7 +184,7 @@ If one process hangs on a long running message method it will not affect the res ### Publisher -1. A message in valid format is appended to one of the input methods. Available inputs are Unix Socket listener, TCP listener, and File Reader. +1. A message in valid format is appended to one of the input methods. Available inputs are Unix Socket listener, TCP listener, and File Reader (**readfolder**). 2. The message is picked up by the system. 3. The method type of the message is checked, a subject is created based on the content of the message, and a publisher process to handle the message type for that specific receiving node is started if it does not exist. 4. The message is then serialized to binary format, and sent to the subscriber on the receiving node. @@ -210,21 +200,17 @@ If one process hangs on a long running message method it will not affect the res ctrl instances with the same **Nodename** will automatically load balance the handling of messages on a given subject, and any given message will only be handled once by one instance. -### Logical structure - -TODO: Make a diagram here... - ## Terminology -- **Node**: Something with an operating system that have network available. This can be a server, a cloud instance, a container, or other. +- **Node**: An instance of **ctrl** running on an operating system that have network available. This can be a server, a cloud instance, a container, or other. - **Process**: A message handler that knows how to handle messages of a given subject concurrently. -- **Message**: A message sent from one ctrl node to another. +- **Message**: A message sent from one **ctrl** node to another. ## Features ### Input methods -New Request Messages in Json/Yaml format can be delivered by the user to ctrl in the following ways: +New Request Messages in Json/Yaml format can be injected by the user to ctrl in the following ways: - **Unix Socket**. Use for example netcat or another tool to deliver new messages to a socket like `nc -U tmp/ctrl.sock < msg.yaml`. - **Read Folder**. Write/Copy messages to be delivered to the `readfolder` of ctrl. @@ -242,13 +228,13 @@ The error logs can be read on the central server in the directory `/d ### Message handling and threads -- The handling of all messages is done by spawning up a process for handling the message in it's own thread. This allows us to down on the **individual message level** keep the state for each message both in regards to ACK's, error handling, send retries, and rerun of a method for a message if the first run was not successful. +- The handling of all messages is done by spawning up a process for handling the message in it's own thread. This allows us to keep the state of each **individual message level** both in regards to ACK's, error handling, send retries, and reruns of methods for a message if the first run was not successful. - Processes for handling messages on a host can be **restarted** upon **failure**, or asked to just terminate and send a message back to the operator that something have gone seriously wrong. This is right now just partially implemented to test that the concept works, where the error action is **action=no-action**. - Publisher Processes on a node for handling new messages for new nodes will automatically be spawned when needed if it does not already exist. -- Messages not fully processed or not started yet will be automatically rehandled if the service is restarted since the current state of all the messages being processed are stored on the local node in a **key value store** until they are finished. +- If enabled, messages not fully processed or not started yet will be automatically rehandled if the service is restarted since the current state of all the messages being processed are stored on the local node in a **key value store** until they are finished. - All messages processed by a publisher will be written to a log file after they are processed, with all the information needed to recreate the same message if needed, or it can be used for auditing. @@ -259,7 +245,7 @@ Example: We probably want an **ACK** when sending some **REQCLICommand** to be e If a message are **ACK** or **NACK** type are defined by the value of the **ACKTimeout** for each individual message: 1. **ACKTimeout** set to 0 will make the message become a **NACK** message. - 1. **ACKTimeout** set to >=1 will make the message become an **ACK** message. + 2. **ACKTimeout** set to >=1 will make the message become an **ACK** message. ### Timeouts and retries for requests @@ -327,19 +313,7 @@ The flow will be like this: ### Flags and configuration file -ctrl supports both the use of flags with values set at startup, and the use of a config file. - -- A default config file will be created at first startup if one does not exist - - The default config will contain default values. - - Any value also provided via a flag will also be written to the config file. -- If **ctrl** is restarted, the current content of the config file will be used as the new defaults. - - If you restart ctrl without any flags specified, the values of the last run will be read from the config file. -- If new values are provided via CLI flags, they will take **precedence** over the ones currently in the config file. - - The new CLI flag values will be written to the config, making it the default for the next restart. -- The config file can be edited directly, removing the need for CLI flag use. -- To create a default config, simply: - 1. Remove the current config file (or move it). - 2. Restart ctrl. A new default config file, with default values, will be created. +ctrl supports both the use of flags with env variables. An .env file can also be used. ### Schema for the messages to send into ctrl via the API's @@ -361,9 +335,9 @@ ctrl supports both the use of flags with values set at startup, and the use of a ### Nats messaging timeouts -The various timeouts for the Nats messages can be controlled via the configuration file or flags. +The various timeouts for the messages can be controlled via the configuration file or flags. -If the network media is a high latency. satellite links it will make sense to adjust the client timeout to reflect the latency +If the network media is a high latency like satellite links, it will make sense to adjust the client timeout to reflect the latency ```text -natsConnOptTimeout int @@ -413,19 +387,13 @@ To enable **CBOR** serialization either start **ctrl** by setting the serializat ./ctrl -serialization="cbor" ``` -Or edit the config file `/etc/config.toml` and set: - -```toml -Serialization = "cbor" -``` - ### startup folder #### General functionality Messages can be automatically scheduled to be read and executed at startup of ctrl. -A folder named **startup** will be present in the working directory of ctrl, and you put the messages to be executed at startup here. +A folder named **startup** will be present in the working directory of ctrl. To inject messages at startup, put them here. Messages put in the startup folder will not be sent to the broker but handled locally, and only (eventually) the reply message from the Request Method called will be sent to the broker. @@ -657,35 +625,6 @@ Scrape web url, and get the html sent back in a reply message. Uses the methodTi ] ``` -#### REQHttpGetScheduled - -**REQ Method are DEPRECATED** -Schedule scraping of a web web url, and get the html sent back in a reply message. Uses the methodTimeout for how long it will wait for the http get method to return result. - -The **methodArgs** also takes 3 arguments: - - 1. The URL to scrape. - 2. The schedule interval given in **seconds**. - 3. How long the scheduler should run in minutes. - -The example below will scrape the URL specified every 30 seconds for 10 minutes. - -```json -[ - { - "directory": "web", - "fileName": "web.html", - "toNode": "ship2", - "method":"REQHttpGet", - "methodArgs": ["https://web.ics.purdue.edu/~gchopra/class/public/pages/webdesign/05_simple.html","30","10"], - "replyMethod":"REQToFile", - "ACKTimeout":10, - "retries": 3, - "methodTimeout": 3 - } -] -``` - #### REQHello Send Hello messages. @@ -862,7 +801,7 @@ Or the same using bash's herestring: ### Errors reporting -- Errors happening on **all** nodes will be reported back in to the node name defined with the `-centralNodeName` flag. +- Errors happening on **all** nodes will be reported back to the node(s) started with the flag `-isCentralErrorLogger` set to true. ### Prometheus metrics @@ -1372,3 +1311,11 @@ ctrl is the continuation of the code I earlier wrote for RaaLabs called Steward. This started out as an idea I had for how to control infrastructure. This is the continuation of the same idea, and a project I'm working on free of charge in my own spare time, so please be gentle :) NB: Filing of issues and bug fixes are highly appreciated. Feature requests will genereally not be followed up simply because I don't have the time to review it at this time : + +## Disclaimer + +All code in this repository are to be concidered not-production-ready, and the use is at your own responsibility and risk. The code are the attempt to concretize the idea of a purely async management system where the controlling unit is decoupled from the receiving unit, and that that we know the state of all the receiving units at all times. + +Also read the license file for further details. + +Expect the main branch to have breaking changes. If stability is needed, use the released packages, and read the release notes where changes will be explained. diff --git a/central_auth_acl_handling.go b/central_auth_acl_handling.go index 407672e..de9e584 100644 --- a/central_auth_acl_handling.go +++ b/central_auth_acl_handling.go @@ -82,7 +82,7 @@ func newSchemaMain(configuration *Configuration, errorKernel *errorKernel) *sche func() { if _, err := os.Stat(s.ACLMapFilePath); os.IsNotExist(err) { er := fmt.Errorf("info: newSchemaMain: no file for ACLMap found, will create new one, %v: %v", s.ACLMapFilePath, err) - errorKernel.logInfo(er, configuration) + errorKernel.logInfo(er) // If no aclmap is present on disk we just return from this // function without loading any values. @@ -92,20 +92,20 @@ func newSchemaMain(configuration *Configuration, errorKernel *errorKernel) *sche fh, err := os.Open(s.ACLMapFilePath) if err != nil { er := fmt.Errorf("error: newSchemaMain: failed to open file for reading %v: %v", s.ACLMapFilePath, err) - errorKernel.logError(er, configuration) + errorKernel.logError(er) } b, err := io.ReadAll(fh) if err != nil { er := fmt.Errorf("error: newSchemaMain: failed to ReadAll file %v: %v", s.ACLMapFilePath, err) - errorKernel.logError(er, configuration) + errorKernel.logError(er) } // Unmarshal the data read from disk. err = json.Unmarshal(b, &s.ACLMap) if err != nil { er := fmt.Errorf("error: newSchemaMain: failed to unmarshal content from file %v: %v", s.ACLMapFilePath, err) - errorKernel.logError(er, configuration) + errorKernel.logError(er) } // Generate the aclGenerated map happens in the function where this function is called. @@ -225,7 +225,7 @@ func (c *centralAuth) aclAddCommand(host Node, source Node, cmd command) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: addCommandForFromNode: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } // fmt.Printf(" * DEBUG: aclNodeFromnodeCommandAdd: a.schemaMain.ACLMap=%v\n", a.schemaMain.ACLMap) @@ -255,7 +255,7 @@ func (c *centralAuth) aclDeleteCommand(host Node, source Node, cmd command) erro err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: aclNodeFromNodeCommandDelete: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } return nil @@ -280,7 +280,7 @@ func (c *centralAuth) aclDeleteSource(host Node, source Node) error { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: aclNodeFromnodeDelete: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } return nil @@ -299,7 +299,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { fh, err := os.OpenFile(c.accessLists.schemaMain.ACLMapFilePath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0660) if err != nil { er := fmt.Errorf("error: generateACLsForAllNodes: opening file for writing: %v, err: %v", c.accessLists.schemaMain.ACLMapFilePath, err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } defer fh.Close() @@ -311,7 +311,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { err = enc.Encode(c.accessLists.schemaMain.ACLMap) if err != nil { er := fmt.Errorf("error: generateACLsForAllNodes: encoding json to file failed: %v, err: %v", c.accessLists.schemaMain.ACLMapFilePath, err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } }() @@ -334,7 +334,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { } inf := fmt.Errorf("generateACLsFor all nodes, ACLsToConvert contains: %#v", c.accessLists.schemaGenerated.ACLsToConvert) - c.accessLists.errorKernel.logDebug(inf, c.accessLists.configuration) + c.accessLists.errorKernel.logDebug(inf) // ACLsToConvert got the complete picture of what ACL's that // are defined for each individual host node. @@ -355,7 +355,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { cb, err := cbor.Marshal(m) if err != nil { er := fmt.Errorf("error: generateACLsForAllNodes: failed to generate cbor for host in schemaGenerated: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) os.Exit(1) } @@ -366,7 +366,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { b, err := cbor.Marshal(sns) if err != nil { er := fmt.Errorf("error: generateACLsForAllNodes: failed to generate cbor for hash: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return [32]byte{} } @@ -387,7 +387,7 @@ func (c *centralAuth) generateACLsForAllNodes() error { }() inf = fmt.Errorf("generateACLsFor all nodes, GeneratedACLsMap contains: %#v", c.accessLists.schemaGenerated.GeneratedACLsMap) - c.accessLists.errorKernel.logDebug(inf, c.accessLists.configuration) + c.accessLists.errorKernel.logDebug(inf) return nil } @@ -456,7 +456,7 @@ func (c *centralAuth) groupNodesAddNode(ng nodeGroup, n Node) { if !strings.HasPrefix(string(ng), "grp_nodes_") { er := fmt.Errorf("error: group name do not start with grp_nodes_") - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -473,7 +473,7 @@ func (c *centralAuth) groupNodesAddNode(ng nodeGroup, n Node) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupNodesAddNode: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } @@ -484,7 +484,7 @@ func (c *centralAuth) groupNodesDeleteNode(ng nodeGroup, n Node) { defer c.accessLists.schemaMain.mu.Unlock() if _, ok := c.accessLists.schemaMain.NodeGroupMap[ng][n]; !ok { er := fmt.Errorf("info: no such node with name=%v found in group=%v", ng, n) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -495,7 +495,7 @@ func (c *centralAuth) groupNodesDeleteNode(ng nodeGroup, n Node) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupNodesDeleteNode: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } @@ -506,7 +506,7 @@ func (c *centralAuth) groupNodesDeleteGroup(ng nodeGroup) { defer c.accessLists.schemaMain.mu.Unlock() if _, ok := c.accessLists.schemaMain.NodeGroupMap[ng]; !ok { er := fmt.Errorf("info: no such group found: %v", ng) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -517,7 +517,7 @@ func (c *centralAuth) groupNodesDeleteGroup(ng nodeGroup) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupNodesDeleteGroup: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } @@ -535,7 +535,7 @@ func (c *centralAuth) groupCommandsAddCommand(cg commandGroup, cmd command) { if !strings.HasPrefix(string(cg), "grp_commands_") { er := fmt.Errorf("error: group name do not start with grp_commands_") - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -552,7 +552,7 @@ func (c *centralAuth) groupCommandsAddCommand(cg commandGroup, cmd command) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupCommandsAddCommand: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } @@ -563,7 +563,7 @@ func (c *centralAuth) groupCommandsDeleteCommand(cg commandGroup, cmd command) { defer c.accessLists.schemaMain.mu.Unlock() if _, ok := c.accessLists.schemaMain.CommandGroupMap[cg][cmd]; !ok { er := fmt.Errorf("info: no such command with name=%v found in group=%v", c, cg) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -574,7 +574,7 @@ func (c *centralAuth) groupCommandsDeleteCommand(cg commandGroup, cmd command) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupCommandsDeleteCommand: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } @@ -585,7 +585,7 @@ func (c *centralAuth) groupCommandDeleteGroup(cg commandGroup) { defer c.accessLists.schemaMain.mu.Unlock() if _, ok := c.accessLists.schemaMain.CommandGroupMap[cg]; !ok { er := fmt.Errorf("info: no such group found: %v", cg) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) return } @@ -596,7 +596,7 @@ func (c *centralAuth) groupCommandDeleteGroup(cg commandGroup) { err := c.generateACLsForAllNodes() if err != nil { er := fmt.Errorf("error: groupCommandDeleteGroup: %v", err) - c.errorKernel.logError(er, c.configuration) + c.errorKernel.logError(er) } } diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index 982c21f..31e034f 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -81,7 +81,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { db, err := bolt.Open(databaseFilepath, 0660, nil) if err != nil { er := fmt.Errorf("newPKI: error: failed to open db: %v", err) - errorKernel.logDebug(er, configuration) + errorKernel.logDebug(er) return &p } @@ -91,7 +91,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { keys, err := p.dbDumpPublicKey() if err != nil { er := fmt.Errorf("newPKI: dbPublicKeyDump failed, probably empty db: %v", err) - errorKernel.logDebug(er, configuration) + errorKernel.logDebug(er) } // Only assign from storage to in memory map if the storage contained any values. @@ -99,7 +99,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { p.nodesAcked.keysAndHash.Keys = keys for k, v := range keys { er := fmt.Errorf("newPKI: public keys db contains: %v, %v", k, []byte(v)) - errorKernel.logDebug(er, configuration) + errorKernel.logDebug(er) } } @@ -128,7 +128,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { if ok && bytes.Equal(existingKey, msg.Data) { er := fmt.Errorf("info: public key value for REGISTERED node %v is the same, doing nothing", msg.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return } @@ -147,7 +147,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) c.pki.errorKernel.infoSend(proc, msg, er) - c.pki.errorKernel.logDebug(er, c.pki.configuration) + c.pki.errorKernel.logDebug(er) } // deletePublicKeys to the db if the node do not exist, or if it is a new value. @@ -169,7 +169,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string } er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) c.pki.errorKernel.infoSend(proc, msg, er) } @@ -230,7 +230,7 @@ func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error { err := bu.Delete([]byte(n)) if err != nil { er := fmt.Errorf("error: delete key in bucket %v failed: %v", bucket, err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return er } } @@ -324,14 +324,14 @@ func (p *pki) dbViewHash() ([]byte, error) { bu := tx.Bucket([]byte("hash")) if bu == nil { er := fmt.Errorf("info: no db hash bucket exist") - p.errorKernel.logWarn(er, p.configuration) + p.errorKernel.logWarn(er) return nil } v := bu.Get([]byte("hash")) if len(v) == 0 { er := fmt.Errorf("info: view: hash key not found") - p.errorKernel.logWarn(er, p.configuration) + p.errorKernel.logWarn(er) return nil } diff --git a/cmd/ctrl/main.go b/cmd/ctrl/main.go index 7eda56f..a2d9153 100644 --- a/cmd/ctrl/main.go +++ b/cmd/ctrl/main.go @@ -27,11 +27,11 @@ func main() { //defer profile.Start(profile.MemProfile, profile.MemProfileRate(1)).Stop() c := ctrl.NewConfiguration() - err := c.CheckFlags(version) - if err != nil { - log.Printf("%v\n", err) - return - } + // err := c.CheckFlags(version) + // if err != nil { + // log.Printf("%v\n", err) + // return + // } // Start profiling if profiling port is specified if c.ProfilingPort != "" { diff --git a/configuration_flags.go b/configuration_flags.go index a5c0a4b..8a29353 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -2,12 +2,11 @@ package ctrl import ( "flag" - "fmt" "log" "os" - "path/filepath" + "strconv" - toml "github.com/pelletier/go-toml/v2" + "github.com/joho/godotenv" ) // Configuration are the structure that holds all the different @@ -119,8 +118,6 @@ type Configuration struct { StartSubREQToFileAppend bool `comment:"Start subscriber for text logging"` // Start subscriber for writing to file StartSubREQToFile bool `comment:"Start subscriber for writing to file"` - // Start subscriber for writing to file without ACK - StartSubREQToFileNACK bool `comment:"Start subscriber for writing to file without ACK"` // Start subscriber for reading files to copy StartSubREQCopySrc bool `comment:"Start subscriber for reading files to copy"` // Start subscriber for writing copied files to disk @@ -139,75 +136,86 @@ type Configuration struct { StartSubREQCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."` } -// ConfigurationFromFile should have the same structure as -// Configuration. This structure is used when parsing the -// configuration values from file, so we are able to detect -// if a value were given or not when parsing. -type ConfigurationFromFile struct { - ConfigFolder *string - RingBufferPersistStore *bool - RingBufferSize *int - SocketFolder *string - ReadFolder *string - EnableReadFolder *bool - TCPListener *string - HTTPListener *string - DatabaseFolder *string - NodeName *string - BrokerAddress *string - NatsConnOptTimeout *int - NatsConnectRetryInterval *int - NatsReconnectJitter *int - NatsReconnectJitterTLS *int - REQKeysRequestUpdateInterval *int - REQAclRequestUpdateInterval *int - ProfilingPort *string - PromHostAndPort *string - DefaultMessageTimeout *int - DefaultMessageRetries *int - DefaultMethodTimeout *int - SubscribersDataFolder *string - CentralNodeName *string - RootCAPath *string - NkeySeedFile *string - NkeyFromED25519SSHKeyFile *string - NkeySeed *string - ExposeDataFolder *string - ErrorMessageTimeout *int - ErrorMessageRetries *int - Compression *string - Serialization *string - SetBlockProfileRate *int - EnableSocket *bool - EnableSignatureCheck *bool - EnableAclCheck *bool - IsCentralAuth *bool - EnableDebug *bool - LogLevel *string - LogConsoleTimestamps *bool - KeepPublishersAliveFor *int - - StartPubREQHello *int - EnableKeyUpdates *bool - EnableAclUpdates *bool - IsCentralErrorLogger *bool - StartSubREQHello *bool - StartSubREQToFileAppend *bool - StartSubREQToFile *bool - StartSubREQToFileNACK *bool - StartSubREQCopySrc *bool - StartSubREQCopyDst *bool - StartSubREQCliCommand *bool - StartSubREQToConsole *bool - StartSubREQHttpGet *bool - StartSubREQHttpGetScheduled *bool - StartSubREQTailFile *bool - StartSubREQCliCommandCont *bool -} - // NewConfiguration will return a *Configuration. func NewConfiguration() *Configuration { - c := Configuration{} + c := newConfigurationDefaults() + + err := godotenv.Load() + if err != nil { + log.Printf("Error loading .env file: %v\n", err) + } + + //flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/ctrl/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER") + flag.StringVar(&c.SocketFolder, "socketFolder", CheckEnv("SOCKET_FOLDER", c.SocketFolder).(string), "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.") + flag.StringVar(&c.ReadFolder, "readFolder", CheckEnv("READ_FOLDER", c.ReadFolder).(string), "folder who contains the readfolder. Defaults to ./readfolder/. If other folder is used this flag must be specified at startup.") + flag.StringVar(&c.TCPListener, "tcpListener", CheckEnv("TCP_LISTENER", c.TCPListener).(string), "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") + flag.StringVar(&c.HTTPListener, "httpListener", CheckEnv("HTTP_LISTENER", c.HTTPListener).(string), "start up a HTTP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") + flag.StringVar(&c.DatabaseFolder, "databaseFolder", CheckEnv("DATABASE_FOLDER", c.DatabaseFolder).(string), "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.") + flag.StringVar(&c.NodeName, "nodeName", CheckEnv("NODE_NAME", c.NodeName).(string), "some unique string to identify this Edge unit") + flag.StringVar(&c.BrokerAddress, "brokerAddress", CheckEnv("BROKER_ADDRESS", c.BrokerAddress).(string), "the address of the message broker") + flag.IntVar(&c.NatsConnOptTimeout, "natsConnOptTimeout", CheckEnv("NATS_CONN_OPT_TIMEOUT", c.NatsConnOptTimeout).(int), "default nats client conn timeout in seconds") + flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", CheckEnv("NATS_CONNECT_RETRY_INTERVAL", c.NatsConnectRetryInterval).(int), "default nats retry connect interval in seconds.") + flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", CheckEnv("NATS_RECONNECT_JITTER", c.NatsReconnectJitter).(int), "default nats ReconnectJitter interval in milliseconds.") + flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", CheckEnv("NATS_RECONNECT_JITTER_TLS", c.NatsReconnectJitterTLS).(int), "default nats ReconnectJitterTLS interval in seconds.") + flag.IntVar(&c.REQKeysRequestUpdateInterval, "REQKeysRequestUpdateInterval", CheckEnv("REQ_KEYS_UPDATE_INTERVAL", c.REQKeysRequestUpdateInterval).(int), "default interval in seconds for asking the central for public keys") + flag.IntVar(&c.REQAclRequestUpdateInterval, "REQAclRequestUpdateInterval", CheckEnv("REQ_ACL_REQUEST_UPDATE_INTERVAL", c.REQAclRequestUpdateInterval).(int), "default interval in seconds for asking the central for acl updates") + flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port") + flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112") + flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level") + flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system") + flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run") + flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", CheckEnv("SUBSCRIBER_DATA_FOLDER", c.SubscribersDataFolder).(string), "The data folder where subscribers are allowed to write their data if needed") + flag.StringVar(&c.CentralNodeName, "centralNodeName", CheckEnv("CENTRAL_NODE_NAME", c.CentralNodeName).(string), "The name of the central node to receive messages published by this node") + flag.StringVar(&c.RootCAPath, "rootCAPath", CheckEnv("ROOT_CA_PATH", c.RootCAPath).(string), "If TLS, enter the path for where to find the root CA certificate") + flag.StringVar(&c.NkeyFromED25519SSHKeyFile, "nkeyFromED25519SSHKeyFile", CheckEnv("NKEY_FROM_ED25519_SSH_KEY_FILE", c.NkeyFromED25519SSHKeyFile).(string), "The full path of the nkeys seed file") + flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", CheckEnv("NKEY_SEED_FILE", c.NkeySeedFile).(string), "Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified") + flag.StringVar(&c.NkeySeed, "nkeySeed", CheckEnv("NKEY_SEED", c.NkeySeed).(string), "The actual nkey seed. To use if not stored in file") + flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") + flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out") + flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it") + flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression") + flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") + flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") + flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file") + flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.") + flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.") + flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server") + flag.BoolVar(&c.EnableDebug, "enableDebug", CheckEnv("ENABLE_DEBUG", c.EnableDebug).(bool), "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") + flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none") + flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr") + flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish") + + // Start of Request publishers/subscribers + + flag.IntVar(&c.StartPubREQHello, "startPubREQHello", CheckEnv("START_PUB_REQ_HELLO", c.StartPubREQHello).(int), "Make the current node send hello messages to central at given interval in seconds") + + flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.EnableKeyUpdates).(bool), "true/false") + + flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.EnableAclUpdates).(bool), "true/false") + + flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.IsCentralErrorLogger).(bool), "true/false") + flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", CheckEnv("START_SUB_REQ_HELLO", c.StartSubREQHello).(bool), "true/false") + flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", CheckEnv("START_SUB_REQ_TO_FILE_APPEND", c.StartSubREQToFileAppend).(bool), "true/false") + flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", CheckEnv("START_SUB_REQ_TO_FILE", c.StartSubREQToFile).(bool), "true/false") + flag.BoolVar(&c.StartSubREQCopySrc, "startSubREQCopySrc", CheckEnv("START_SUB_REQ_COPY_SRC", c.StartSubREQCopySrc).(bool), "true/false") + flag.BoolVar(&c.StartSubREQCopyDst, "startSubREQCopyDst", CheckEnv("START_SUB_REQ_COPY_DST", c.StartSubREQCopyDst).(bool), "true/false") + flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", CheckEnv("START_SUB_REQ_CLI_COMMAND", c.StartSubREQCliCommand).(bool), "true/false") + flag.BoolVar(&c.StartSubREQToConsole, "startSubREQToConsole", CheckEnv("START_SUB_REQ_TO_CONSOLE", c.StartSubREQToConsole).(bool), "true/false") + flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", CheckEnv("START_SUB_REQ_HTTP_GET", c.StartSubREQHttpGet).(bool), "true/false") + flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", CheckEnv("START_SUB_REQ_HTTP_GET_SCHEDULED", c.StartSubREQHttpGetScheduled).(bool), "true/false") + flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", CheckEnv("START_SUB_REQ_TAIL_FILE", c.StartSubREQTailFile).(bool), "true/false") + flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", CheckEnv("START_SUB_REQ_CLI_COMMAND_CONT", c.StartSubREQCliCommandCont).(bool), "true/false") + + // Check that mandatory flag values have been set. + switch { + case c.NodeName == "": + log.Fatalf("error: the nodeName config option or flag cannot be empty, check -help\n") + case c.CentralNodeName == "": + log.Fatalf("error: the centralNodeName config option or flag cannot be empty, check -help\n") + } + + flag.Parse() + return &c } @@ -262,7 +270,6 @@ func newConfigurationDefaults() Configuration { StartSubREQHello: true, StartSubREQToFileAppend: true, StartSubREQToFile: true, - StartSubREQToFileNACK: true, StartSubREQCopySrc: true, StartSubREQCopyDst: true, StartSubREQCliCommand: true, @@ -275,474 +282,30 @@ func newConfigurationDefaults() Configuration { return c } -// Check if all values are present in config file, and if not -// found use the default value. -func checkConfigValues(cf ConfigurationFromFile) Configuration { - var conf Configuration - cd := newConfigurationDefaults() - - if cf.ConfigFolder == nil { - conf.ConfigFolder = cd.ConfigFolder - } else { - conf.ConfigFolder = *cf.ConfigFolder - } - if cf.SocketFolder == nil { - conf.SocketFolder = cd.SocketFolder - } else { - conf.SocketFolder = *cf.SocketFolder - } - if cf.ReadFolder == nil { - conf.ReadFolder = cd.ReadFolder - } else { - conf.ReadFolder = *cf.ReadFolder - } - if cf.EnableReadFolder == nil { - conf.EnableReadFolder = cd.EnableReadFolder - } else { - conf.EnableReadFolder = *cf.EnableReadFolder - } - if cf.TCPListener == nil { - conf.TCPListener = cd.TCPListener - } else { - conf.TCPListener = *cf.TCPListener - } - if cf.HTTPListener == nil { - conf.HTTPListener = cd.HTTPListener - } else { - conf.HTTPListener = *cf.HTTPListener - } - if cf.DatabaseFolder == nil { - conf.DatabaseFolder = cd.DatabaseFolder - } else { - conf.DatabaseFolder = *cf.DatabaseFolder - } - if cf.NodeName == nil { - conf.NodeName = cd.NodeName - } else { - conf.NodeName = *cf.NodeName - } - if cf.BrokerAddress == nil { - conf.BrokerAddress = cd.BrokerAddress - } else { - conf.BrokerAddress = *cf.BrokerAddress - } - if cf.NatsConnOptTimeout == nil { - conf.NatsConnOptTimeout = cd.NatsConnOptTimeout - } else { - conf.NatsConnOptTimeout = *cf.NatsConnOptTimeout - } - if cf.NatsConnectRetryInterval == nil { - conf.NatsConnectRetryInterval = cd.NatsConnectRetryInterval - } else { - conf.NatsConnectRetryInterval = *cf.NatsConnectRetryInterval - } - if cf.NatsReconnectJitter == nil { - conf.NatsReconnectJitter = cd.NatsReconnectJitter - } else { - conf.NatsReconnectJitter = *cf.NatsReconnectJitter - } - if cf.NatsReconnectJitterTLS == nil { - conf.NatsReconnectJitterTLS = cd.NatsReconnectJitterTLS - } else { - conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS - } - if cf.REQKeysRequestUpdateInterval == nil { - conf.REQKeysRequestUpdateInterval = cd.REQKeysRequestUpdateInterval - } else { - conf.REQKeysRequestUpdateInterval = *cf.REQKeysRequestUpdateInterval - } - if cf.REQAclRequestUpdateInterval == nil { - conf.REQAclRequestUpdateInterval = cd.REQAclRequestUpdateInterval - } else { - conf.REQAclRequestUpdateInterval = *cf.REQAclRequestUpdateInterval - } - if cf.ProfilingPort == nil { - conf.ProfilingPort = cd.ProfilingPort - } else { - conf.ProfilingPort = *cf.ProfilingPort - } - if cf.PromHostAndPort == nil { - conf.PromHostAndPort = cd.PromHostAndPort - } else { - conf.PromHostAndPort = *cf.PromHostAndPort - } - if cf.DefaultMessageTimeout == nil { - conf.DefaultMessageTimeout = cd.DefaultMessageTimeout - } else { - conf.DefaultMessageTimeout = *cf.DefaultMessageTimeout - } - if cf.DefaultMessageRetries == nil { - conf.DefaultMessageRetries = cd.DefaultMessageRetries - } else { - conf.DefaultMessageRetries = *cf.DefaultMessageRetries - } - if cf.DefaultMethodTimeout == nil { - conf.DefaultMethodTimeout = cd.DefaultMethodTimeout - } else { - conf.DefaultMethodTimeout = *cf.DefaultMethodTimeout - } - if cf.SubscribersDataFolder == nil { - conf.SubscribersDataFolder = cd.SubscribersDataFolder - } else { - conf.SubscribersDataFolder = *cf.SubscribersDataFolder - } - if cf.CentralNodeName == nil { - conf.CentralNodeName = cd.CentralNodeName - } else { - conf.CentralNodeName = *cf.CentralNodeName - } - if cf.RootCAPath == nil { - conf.RootCAPath = cd.RootCAPath - } else { - conf.RootCAPath = *cf.RootCAPath - } - if cf.NkeySeedFile == nil { - conf.NkeySeedFile = cd.NkeySeedFile - } else { - conf.NkeySeedFile = *cf.NkeySeedFile - } - if cf.NkeyFromED25519SSHKeyFile == nil { - conf.NkeyFromED25519SSHKeyFile = cd.NkeyFromED25519SSHKeyFile - } else { - conf.NkeyFromED25519SSHKeyFile = *cf.NkeyFromED25519SSHKeyFile - } - if cf.NkeySeed == nil { - conf.NkeySeed = cd.NkeySeed - } else { - conf.NkeySeed = *cf.NkeySeed - } - if cf.ExposeDataFolder == nil { - conf.ExposeDataFolder = cd.ExposeDataFolder - } else { - conf.ExposeDataFolder = *cf.ExposeDataFolder - } - if cf.ErrorMessageTimeout == nil { - conf.ErrorMessageTimeout = cd.ErrorMessageTimeout - } else { - conf.ErrorMessageTimeout = *cf.ErrorMessageTimeout - } - if cf.ErrorMessageRetries == nil { - conf.ErrorMessageRetries = cd.ErrorMessageRetries - } else { - conf.ErrorMessageRetries = *cf.ErrorMessageRetries - } - if cf.Compression == nil { - conf.Compression = cd.Compression - } else { - conf.Compression = *cf.Compression - } - if cf.Serialization == nil { - conf.Serialization = cd.Serialization - } else { - conf.Serialization = *cf.Serialization - } - if cf.SetBlockProfileRate == nil { - conf.SetBlockProfileRate = cd.SetBlockProfileRate - } else { - conf.SetBlockProfileRate = *cf.SetBlockProfileRate - } - if cf.EnableSocket == nil { - conf.EnableSocket = cd.EnableSocket - } else { - conf.EnableSocket = *cf.EnableSocket - } - if cf.EnableSignatureCheck == nil { - conf.EnableSignatureCheck = cd.EnableSignatureCheck - } else { - conf.EnableSignatureCheck = *cf.EnableSignatureCheck - } - if cf.EnableAclCheck == nil { - conf.EnableAclCheck = cd.EnableAclCheck - } else { - conf.EnableAclCheck = *cf.EnableAclCheck - } - if cf.IsCentralAuth == nil { - conf.IsCentralAuth = cd.IsCentralAuth - } else { - conf.IsCentralAuth = *cf.IsCentralAuth - } - if cf.EnableDebug == nil { - conf.EnableDebug = cd.EnableDebug - } else { - conf.EnableDebug = *cf.EnableDebug - } - if cf.LogLevel == nil { - conf.LogLevel = cd.LogLevel - } else { - conf.LogLevel = *cf.LogLevel - } - if cf.LogConsoleTimestamps == nil { - conf.LogConsoleTimestamps = cd.LogConsoleTimestamps - } else { - conf.LogConsoleTimestamps = *cf.LogConsoleTimestamps - } - if cf.KeepPublishersAliveFor == nil { - conf.KeepPublishersAliveFor = cd.KeepPublishersAliveFor - } else { - conf.KeepPublishersAliveFor = *cf.KeepPublishersAliveFor +func CheckEnv[T any](key string, v T) any { + val, ok := os.LookupEnv(key) + if !ok { + return v } - // --- Start pub/sub - - if cf.StartPubREQHello == nil { - conf.StartPubREQHello = cd.StartPubREQHello - } else { - conf.StartPubREQHello = *cf.StartPubREQHello - } - if cf.EnableKeyUpdates == nil { - conf.EnableKeyUpdates = cd.EnableKeyUpdates - } else { - conf.EnableKeyUpdates = *cf.EnableKeyUpdates - } - - if cf.EnableAclUpdates == nil { - conf.EnableAclUpdates = cd.EnableAclUpdates - } else { - conf.EnableAclUpdates = *cf.EnableAclUpdates - } - - if cf.IsCentralErrorLogger == nil { - conf.IsCentralErrorLogger = cd.IsCentralErrorLogger - } else { - conf.IsCentralErrorLogger = *cf.IsCentralErrorLogger - } - if cf.StartSubREQHello == nil { - conf.StartSubREQHello = cd.StartSubREQHello - } else { - conf.StartSubREQHello = *cf.StartSubREQHello - } - if cf.StartSubREQToFileAppend == nil { - conf.StartSubREQToFileAppend = cd.StartSubREQToFileAppend - } else { - conf.StartSubREQToFileAppend = *cf.StartSubREQToFileAppend - } - if cf.StartSubREQToFile == nil { - conf.StartSubREQToFile = cd.StartSubREQToFile - } else { - conf.StartSubREQToFile = *cf.StartSubREQToFile - } - if cf.StartSubREQToFileNACK == nil { - conf.StartSubREQToFileNACK = cd.StartSubREQToFileNACK - } else { - conf.StartSubREQToFileNACK = *cf.StartSubREQToFileNACK - } - if cf.StartSubREQCopySrc == nil { - conf.StartSubREQCopySrc = cd.StartSubREQCopySrc - } else { - conf.StartSubREQCopySrc = *cf.StartSubREQCopySrc - } - if cf.StartSubREQCopyDst == nil { - conf.StartSubREQCopyDst = cd.StartSubREQCopyDst - } else { - conf.StartSubREQCopyDst = *cf.StartSubREQCopyDst - } - if cf.StartSubREQCliCommand == nil { - conf.StartSubREQCliCommand = cd.StartSubREQCliCommand - } else { - conf.StartSubREQCliCommand = *cf.StartSubREQCliCommand - } - if cf.StartSubREQToConsole == nil { - conf.StartSubREQToConsole = cd.StartSubREQToConsole - } else { - conf.StartSubREQToConsole = *cf.StartSubREQToConsole - } - if cf.StartSubREQHttpGet == nil { - conf.StartSubREQHttpGet = cd.StartSubREQHttpGet - } else { - conf.StartSubREQHttpGet = *cf.StartSubREQHttpGet - } - if cf.StartSubREQHttpGetScheduled == nil { - conf.StartSubREQHttpGetScheduled = cd.StartSubREQHttpGetScheduled - } else { - conf.StartSubREQHttpGetScheduled = *cf.StartSubREQHttpGetScheduled - } - if cf.StartSubREQTailFile == nil { - conf.StartSubREQTailFile = cd.StartSubREQTailFile - } else { - conf.StartSubREQTailFile = *cf.StartSubREQTailFile - } - if cf.StartSubREQCliCommandCont == nil { - conf.StartSubREQCliCommandCont = cd.StartSubREQCliCommandCont - } else { - conf.StartSubREQCliCommandCont = *cf.StartSubREQCliCommandCont - } - - return conf -} - -// CheckFlags will parse all flags -func (c *Configuration) CheckFlags(version string) error { - - // Create an empty default config - var fc Configuration - - // Set default configfolder if no env was provided. - configFolder := os.Getenv("CONFIG_FOLDER") - - if configFolder == "" { - configFolder = "./etc/" - } - - // Read file config. Set system default if it can't find config file. - fc, err := c.ReadConfigFile(configFolder) - if err != nil { - log.Printf("%v\n", err) - fc = newConfigurationDefaults() - } - - if configFolder == "" { - fc.ConfigFolder = "./etc/" - } else { - fc.ConfigFolder = configFolder - } - - *c = fc - - //flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/ctrl/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER") - flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.") - flag.StringVar(&c.ReadFolder, "readFolder", fc.ReadFolder, "folder who contains the readfolder. Defaults to ./readfolder/. If other folder is used this flag must be specified at startup.") - flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") - flag.StringVar(&c.HTTPListener, "httpListener", fc.HTTPListener, "start up a HTTP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") - flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.") - flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit") - flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker") - flag.IntVar(&c.NatsConnOptTimeout, "natsConnOptTimeout", fc.NatsConnOptTimeout, "default nats client conn timeout in seconds") - flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.") - flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.") - flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.") - flag.IntVar(&c.REQKeysRequestUpdateInterval, "REQKeysRequestUpdateInterval", fc.REQKeysRequestUpdateInterval, "default interval in seconds for asking the central for public keys") - flag.IntVar(&c.REQAclRequestUpdateInterval, "REQAclRequestUpdateInterval", fc.REQAclRequestUpdateInterval, "default interval in seconds for asking the central for acl updates") - flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") - flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") - flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") - flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", fc.DefaultMessageRetries, "default amount of retries that will be done before a message is thrown away, and out of the system") - flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", fc.DefaultMethodTimeout, "default amount of seconds a request method max will be allowed to run") - flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed") - flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node") - flag.StringVar(&c.RootCAPath, "rootCAPath", fc.RootCAPath, "If TLS, enter the path for where to find the root CA certificate") - flag.StringVar(&c.NkeyFromED25519SSHKeyFile, "nkeyFromED25519SSHKeyFile", fc.NkeyFromED25519SSHKeyFile, "The full path of the nkeys seed file") - flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", fc.NkeySeedFile, "Full path to the ED25519 SSH private key. Will generate the NKEY Seed from an SSH ED25519 private key file. NB: This option will take precedence over NkeySeedFile if specified") - flag.StringVar(&c.NkeySeed, "nkeySeed", fc.NkeySeed, "The actual nkey seed. To use if not stored in file") - flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") - flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out") - flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it") - flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression") - flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") - flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", fc.SetBlockProfileRate, "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") - flag.BoolVar(&c.EnableSocket, "enableSocket", fc.EnableSocket, "true/false, for enabling the creation of ctrl.sock file") - flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", fc.EnableSignatureCheck, "true/false *TESTING* enable signature checking.") - flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", fc.EnableAclCheck, "true/false *TESTING* enable Acl checking.") - flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server") - flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") - flag.StringVar(&c.LogLevel, "logLevel", fc.LogLevel, "error/info/warning/debug/none") - flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", fc.LogConsoleTimestamps, "true/false for enabling or disabling timestamps when printing errors and information to stderr") - flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", fc.KeepPublishersAliveFor, "The amount of time we allow a publisher to stay alive without receiving any messages to publish") - - // Start of Request publishers/subscribers - - flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds") - - flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", fc.EnableKeyUpdates, "true/false") - - flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", fc.EnableAclUpdates, "true/false") - - flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", fc.IsCentralErrorLogger, "true/false") - flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") - flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") - flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false") - flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false") - flag.BoolVar(&c.StartSubREQCopySrc, "startSubREQCopySrc", fc.StartSubREQCopySrc, "true/false") - flag.BoolVar(&c.StartSubREQCopyDst, "startSubREQCopyDst", fc.StartSubREQCopyDst, "true/false") - flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false") - flag.BoolVar(&c.StartSubREQToConsole, "startSubREQToConsole", fc.StartSubREQToConsole, "true/false") - flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", fc.StartSubREQHttpGet, "true/false") - flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", fc.StartSubREQHttpGetScheduled, "true/false") - flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false") - flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false") - - purgeBufferDB := flag.Bool("purgeBufferDB", false, "true/false, purge the incoming buffer db and all it's state") - - ver := flag.Bool("version", false, "print version and exit") - - flag.Parse() - - if *ver { - fmt.Printf("version %v\n", version) - os.Exit(0) - } - - // Check that mandatory flag values have been set. - switch { - case c.NodeName == "": - return fmt.Errorf("error: the nodeName config option or flag cannot be empty, check -help") - case c.CentralNodeName == "": - return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help") - } - - if err := c.WriteConfigFile(); err != nil { - log.Printf("error: checkFlags: failed writing config file: %v\n", err) - os.Exit(1) - } - - if *purgeBufferDB { - fp := filepath.Join(c.DatabaseFolder, "incomingBuffer.db") - err := os.Remove(fp) + switch any(v).(type) { + case int: + n, err := strconv.Atoi(val) if err != nil { - log.Printf("error: failed to purge buffer state database: %v\n", err) + log.Fatalf("error: failed to convert env to int: %v\n", n) + } + return n + case string: + return val + case bool: + switch { + case val == "true" || val == "1": + return true + case val == "false" || val == "0" || val == "": + return true } } return nil } - -// Reads the current config file from disk. -func (c *Configuration) ReadConfigFile(configFolder string) (Configuration, error) { - fPath := filepath.Join(configFolder, "config.toml") - - if _, err := os.Stat(fPath); os.IsNotExist(err) { - return Configuration{}, fmt.Errorf("error: no config file found %v: %v", fPath, err) - } - - f, err := os.OpenFile(fPath, os.O_RDONLY, 0660) - if err != nil { - return Configuration{}, fmt.Errorf("error: ReadConfigFile: failed to open file: %v", err) - } - defer f.Close() - - var cFile ConfigurationFromFile - dec := toml.NewDecoder(f) - err = dec.Decode(&cFile) - if err != nil { - log.Printf("error: decoding config.toml file. The program will automatically try to correct the problem, and use sane default where it kind find a value to use, but beware of this error in case the program start to behave in not expected ways: path=%v: err=%v", fPath, err) - } - - // Check that all values read are ok. - conf := checkConfigValues(cFile) - - return conf, nil -} - -// WriteConfigFile will write the current config to file. If the file or the -// directory for the config file does not exist it will be created. -func (c *Configuration) WriteConfigFile() error { - if _, err := os.Stat(c.ConfigFolder); os.IsNotExist(err) { - err := os.MkdirAll(c.ConfigFolder, 0770) - if err != nil { - return fmt.Errorf("error: failed to create config directory %v: %v", c.ConfigFolder, err) - } - } - - fp := filepath.Join(c.ConfigFolder, "config.toml") - - f, err := os.OpenFile(fp, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660) - if err != nil { - return fmt.Errorf("error: WriteConfigFile: failed to open file: %v", err) - } - defer f.Close() - - enc := toml.NewEncoder(f) - enc.Encode(c) - - return nil -} diff --git a/doc/nats-server-relay-configuration/README.md b/doc/nats-server-relay-configuration/README.md deleted file mode 100644 index c221533..0000000 --- a/doc/nats-server-relay-configuration/README.md +++ /dev/null @@ -1,49 +0,0 @@ -# Relay configuration - -Example config where we allow **ship1** to send a message to **ship2** relayed via **central**. - -```json -port: 4222 - -authorization: { - users = [ - { - # central - nkey: - permissions: { - publish: { - allow: [">","errorCentral.>"] - } - subscribe: { - allow: [">","errorCentral.>"] - } - } - } - { - # ship1 - nkey: - permissions: { - publish: { - allow: ["central.>","errorCentral.>","ship1.>","ship2.central.REQRelay.EventACK"] - # deny: ["*.REQRelay.>"] - } - subscribe: { - allow: ["central.>","ship1.>","errorCentral.REQErrorLog.EventACK.reply","*.central.REQRelay.EventACK.reply"] - } - } - } - { - # ship2 - nkey: - permissions: { - publish: { - allow: ["central.>","errorCentral.>","ship2.>"] - } - subscribe: { - allow: ["central.>","ship2.>","errorCentral.REQErrorLog.EventACK.reply"] - } - } - } - ] -} -``` diff --git a/errorkernel.go b/errorkernel.go index eee8e0b..e8acc06 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -240,6 +240,12 @@ func (e errorEvent) Error() string { } // errSend will just send an error message to the errorCentral. +// As input arguments it takes: +// +// The process where the error was generated. +// A message, where this can be an Message{} if you don't want to log the message +// or an actual message. +// The error, and a logLevel. func (e *errorKernel) errSend(proc process, msg Message, err error, logLevel logLevel) { ev := errorEvent{ err: err, @@ -270,26 +276,26 @@ func (e *errorKernel) infoSend(proc process, msg Message, err error) { e.errorCh <- ev } -func (e *errorKernel) logError(err error, c *Configuration) { - if c.LogLevel == string(logError) { +func (e *errorKernel) logError(err error) { + if e.configuration.LogLevel == string(logError) { slog.Error("error", err) } } -func (e *errorKernel) logInfo(err error, c *Configuration) { - if c.LogLevel == string(logInfo) { +func (e *errorKernel) logInfo(err error) { + if e.configuration.LogLevel == string(logInfo) { slog.Info(err.Error()) } } -func (e *errorKernel) logWarn(err error, c *Configuration) { - if c.LogLevel == string(logWarning) { +func (e *errorKernel) logWarn(err error) { + if e.configuration.LogLevel == string(logWarning) { slog.Warn(err.Error()) } } -func (e *errorKernel) logDebug(err error, c *Configuration) { - if c.LogLevel == string(logDebug) { +func (e *errorKernel) logDebug(err error) { + if e.configuration.LogLevel == string(logDebug) { slog.Debug(err.Error()) } } diff --git a/go.mod b/go.mod index 30a3ae9..a20fbeb 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,11 @@ require ( github.com/google/uuid v1.3.0 github.com/hpcloud/tail v1.0.0 github.com/jinzhu/copier v0.4.0 + github.com/joho/godotenv v1.5.1 github.com/klauspost/compress v1.17.0 github.com/nats-io/nats-server/v2 v2.8.4 github.com/nats-io/nats.go v1.25.0 github.com/nats-io/nkeys v0.4.4 - github.com/pelletier/go-toml/v2 v2.0.7 github.com/pkg/profile v1.7.0 github.com/prometheus/client_golang v1.14.0 go.etcd.io/bbolt v1.3.7 diff --git a/go.sum b/go.sum index d12d25c..06329a6 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -68,8 +70,6 @@ github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/pelletier/go-toml/v2 v2.0.7 h1:muncTPStnKRos5dpVKULv2FVd4bMOhNePj9CjgDb8Us= -github.com/pelletier/go-toml/v2 v2.0.7/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= @@ -88,7 +88,6 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/message_and_subject.go b/message_and_subject.go index 25a13c9..7cff482 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -84,14 +84,6 @@ type Message struct { PreviousMessage *Message // Schedule Schedule []int `json:"schedule" yaml:"schedule"` - - // ctx for the specifix message. Used for for example canceling - // scheduled messages. - // NB: Commented out this field for specific message context - // to be used within handlers, since it will override the structure - // we have today. Keeping the code for a bit incase it makes sense - // to implement later. - //ctx context.Context } // --- Subject diff --git a/message_readers.go b/message_readers.go index d1f3745..dad8f9c 100644 --- a/message_readers.go +++ b/message_readers.go @@ -41,12 +41,12 @@ func (s *server) readStartupFolder() { for _, fp := range filePaths { er := fmt.Errorf("info: ranging filepaths, current filePath contains: %v", fp) - s.errorKernel.logInfo(er, s.configuration) + s.errorKernel.logInfo(er) } for _, filePath := range filePaths { er := fmt.Errorf("info: reading and working on file from startup folder %v", filePath) - s.errorKernel.logInfo(er, s.configuration) + s.errorKernel.logInfo(er) // Read the content of each file. readBytes, err := func(filePath string) ([]byte, error) { @@ -225,7 +225,7 @@ func (s *server) readFolder() { err := os.MkdirAll(s.configuration.ReadFolder, 0770) if err != nil { er := fmt.Errorf("error: failed to create readfolder folder: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } } @@ -233,7 +233,7 @@ func (s *server) readFolder() { watcher, err := fsnotify.NewWatcher() if err != nil { er := fmt.Errorf("main: failed to create new logWatcher: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } @@ -248,7 +248,7 @@ func (s *server) readFolder() { if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod { er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) func() { fh, err := os.Open(event.Name) @@ -290,7 +290,7 @@ func (s *server) readFolder() { } er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) - s.errorKernel.errSend(s.processInitial, Message{}, er, logDebug) + s.errorKernel.logDebug(er) // Send the SAM struct to be picked up by the ring buffer. s.samToSendCh <- sams @@ -321,7 +321,7 @@ func (s *server) readFolder() { err = watcher.Add(s.configuration.ReadFolder) if err != nil { er := fmt.Errorf("startLogsWatcher: failed to add watcher: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } } @@ -334,7 +334,7 @@ func (s *server) readTCPListener() { ln, err := net.Listen("tcp", s.configuration.TCPListener) if err != nil { er := fmt.Errorf("error: readTCPListener: failed to start tcp listener: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } // Loop, and wait for new connections. @@ -441,7 +441,7 @@ func (s *server) readHttpListener() { n, err := net.Listen("tcp", s.configuration.HTTPListener) if err != nil { er := fmt.Errorf("error: startMetrics: failed to open prometheus listen port: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } mux := http.NewServeMux() @@ -450,7 +450,7 @@ func (s *server) readHttpListener() { err = http.Serve(n, mux) if err != nil { er := fmt.Errorf("error: startMetrics: failed to start http.Serve: %v", err) - s.errorKernel.logError(er, s.configuration) + s.errorKernel.logError(er) os.Exit(1) } }() diff --git a/node_auth.go b/node_auth.go index d2644d2..0bf953e 100644 --- a/node_auth.go +++ b/node_auth.go @@ -55,7 +55,7 @@ func newNodeAuth(configuration *Configuration, errorKernel *errorKernel) *nodeAu err := n.loadSigningKeys() if err != nil { er := fmt.Errorf("newNodeAuth: %v", err) - errorKernel.logError(er, configuration) + errorKernel.logError(er) os.Exit(1) } @@ -97,7 +97,7 @@ func newNodeAcl(c *Configuration, errorKernel *errorKernel) *nodeAcl { err := n.loadFromFile() if err != nil { er := fmt.Errorf("error: newNodeAcl: loading acl's from file: %v", err) - errorKernel.logError(er, c) + errorKernel.logError(er) // os.Exit(1) } @@ -112,7 +112,7 @@ func (n *nodeAcl) loadFromFile() error { // Just logging the error since it is not crucial that a key file is missing, // since a new one will be created on the next update. er := fmt.Errorf("acl: loadFromFile: no acl file found at %v", n.filePath) - n.errorKernel.logDebug(er, n.configuration) + n.errorKernel.logDebug(er) return nil } @@ -135,7 +135,7 @@ func (n *nodeAcl) loadFromFile() error { } er := fmt.Errorf("nodeAcl: loadFromFile: Loaded existing acl's from file: %v", n.aclAndHash.Hash) - n.errorKernel.logDebug(er, n.configuration) + n.errorKernel.logDebug(er) return nil } @@ -203,7 +203,7 @@ func newPublicKeys(c *Configuration, errorKernel *errorKernel) *publicKeys { err := p.loadFromFile() if err != nil { er := fmt.Errorf("error: newPublicKeys: loading public keys from file: %v", err) - errorKernel.logError(er, c) + errorKernel.logError(er) // os.Exit(1) } @@ -218,7 +218,7 @@ func (p *publicKeys) loadFromFile() error { // Just logging the error since it is not crucial that a key file is missing, // since a new one will be created on the next update. er := fmt.Errorf("no public keys file found at %v, new file will be created", p.filePath) - p.errorKernel.logInfo(er, p.configuration) + p.errorKernel.logInfo(er) return nil } @@ -241,7 +241,7 @@ func (p *publicKeys) loadFromFile() error { } er := fmt.Errorf("nodeAuth: loadFromFile: Loaded existing keys from file: %v", p.keysAndHash.Hash) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } @@ -322,7 +322,7 @@ func (n *nodeAuth) loadSigningKeys() error { n.SignPrivateKey = priv er := fmt.Errorf("info: no signing keys found, generating new keys") - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) // We got the new generated keys now, so we can return. return nil @@ -394,7 +394,7 @@ func (n *nodeAuth) verifySignature(m Message) bool { // NB: Only enable signature checking for REQCliCommand for now. if m.Method != REQCliCommand { er := fmt.Errorf("verifySignature: not REQCliCommand and will not do signature check, method: %v", m.Method) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return true } @@ -417,11 +417,11 @@ func (n *nodeAuth) verifySignature(m Message) bool { }() if err != nil { - n.errorKernel.logError(err, n.configuration) + n.errorKernel.logError(err) } er := fmt.Errorf("info: verifySignature, result: %v, fromNode: %v, method: %v", ok, m.FromNode, m.Method) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return ok } @@ -431,7 +431,7 @@ func (n *nodeAuth) verifyAcl(m Message) bool { // NB: Only enable acl checking for REQCliCommand for now. if m.Method != REQCliCommand { er := fmt.Errorf("verifyAcl: not REQCliCommand and will not do acl check, method: %v", m.Method) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return true } @@ -444,26 +444,26 @@ func (n *nodeAuth) verifyAcl(m Message) bool { cmdMap, ok := n.nodeAcl.aclAndHash.Acl[m.FromNode] if !ok { er := fmt.Errorf("verifyAcl: The fromNode=%v was not found in the acl", m.FromNode) - n.errorKernel.logError(er, n.configuration) + n.errorKernel.logError(er) return false } _, ok = cmdMap[command("*")] if ok { er := fmt.Errorf("verifyAcl: The acl said \"*\", all commands allowed from node=%v", m.FromNode) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return true } _, ok = cmdMap[command(argsStringified)] if !ok { er := fmt.Errorf("verifyAcl: The command=%v was NOT FOUND in the acl", m.MethodArgs) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return false } er := fmt.Errorf("verifyAcl: the command was FOUND in the acl, verifyAcl, result: %v, fromNode: %v, method: %v", ok, m.FromNode, m.Method) - n.errorKernel.logInfo(er, n.configuration) + n.errorKernel.logInfo(er) return true } diff --git a/process.go b/process.go index 811f141..59f6cf2 100644 --- a/process.go +++ b/process.go @@ -48,7 +48,6 @@ type process struct { // also one subject subject Subject // Put a node here to be able know the node a process is at. - // NB: Might not be needed later on. node Node // The processID for the current process processID int @@ -180,8 +179,6 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // process to the processes map in the server structure. func (p process) spawnWorker() { - // processName := processNameGet(p.subject.name(), p.processKind) - // Add prometheus metrics for the process. if !p.isSubProcess { p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)}) @@ -205,7 +202,7 @@ func (p process) spawnWorker() { p.processes.active.mu.Unlock() er := fmt.Errorf("successfully started process: %v", p.processName) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } func (p process) startPublisher() { @@ -257,7 +254,7 @@ func (p process) startSubscriber() { if err != nil { er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err) p.errorKernel.errSend(p, Message{}, er, logError) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } p.processes.active.mu.Lock() @@ -265,7 +262,7 @@ func (p process) startSubscriber() { p.processes.active.mu.Unlock() er := fmt.Errorf("successfully stopped process: %v", p.processName) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) }() } @@ -299,7 +296,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He } er := fmt.Errorf("info: preparing to send nats message with subject %v, id: %v", msg.Subject, message.ID) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) var err error @@ -311,7 +308,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He err := natsConn.PublishMsg(msg) if err != nil { er := fmt.Errorf("error: nats publish for message with subject failed: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return ErrACKSubscribeRetry } p.metrics.promNatsDeliveredTotal.Inc() @@ -346,7 +343,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He } er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) // The SubscribeSync used in the subscriber, will get messages that // are sent after it started subscribing. @@ -357,14 +354,14 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He err := subReply.Unsubscribe() if err != nil { er := fmt.Errorf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } }() if err != nil { er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) er = fmt.Errorf("%v, waiting equal to RetryWait %ds before retrying", er, message.RetryWait) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) time.Sleep(time.Second * time.Duration(message.RetryWait)) @@ -376,7 +373,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He if err != nil { er := fmt.Errorf("error: nats publish failed: %v, waiting equal to RetryWait of %ds before retrying", err, message.RetryWait) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) time.Sleep(time.Second * time.Duration(message.RetryWait)) return ErrACKSubscribeRetry @@ -394,7 +391,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He switch { case err == nats.ErrNoResponders || err == nats.ErrTimeout: er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) time.Sleep(time.Second * time.Duration(message.RetryWait)) p.metrics.promNatsMessagesMissedACKsTotal.Inc() @@ -403,13 +400,13 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return er default: er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", p.subject.name(), err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return er } @@ -434,7 +431,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.metrics.promNatsDeliveredTotal.Inc() er = fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return } @@ -462,7 +459,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // If debugging is enabled, print the source node name of the nats messages received. if val, ok := msg.Header["fromNode"]; ok { er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } // If compression is used, decompress it to get the gob data. If @@ -567,7 +564,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // Check for ACK type Event. case message.ACKTimeout >= 1: er := fmt.Errorf("subscriberHandler: received ACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) // When spawning sub processes we can directly assign handlers to the process upon // creation. We here check if a handler is already assigned, and if it is nil, we // lookup and find the correct handler to use if available. @@ -592,7 +589,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, case message.ACKTimeout < 1: er := fmt.Errorf("subscriberHandler: received NACK message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) // When spawning sub processes we can directly assign handlers to the process upon // creation. We here check if a handler is already assigned, and if it is nil, we // lookup and find the correct handler to use if available. @@ -635,7 +632,7 @@ func (p process) callHandler(message Message, thisNode string) []byte { // ACL/Signature checking failed. er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing") p.errorKernel.errSend(p, message, er, logWarning) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } }() @@ -665,7 +662,7 @@ func executeHandler(p process, message Message, thisNode string) { if p.configuration.EnableAclCheck { // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } switch { @@ -676,7 +673,7 @@ func executeHandler(p process, message Message, thisNode string) { if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) p.errorKernel.errSend(p, message, er, logError) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } }() @@ -687,20 +684,13 @@ func executeHandler(p process, message Message, thisNode string) { defer intervalTicker.Stop() defer totalTimeTicker.Stop() - // NB: Commented out this assignement of a specific message context - // to be used within handlers, since it will override the structure - // we have today. Keeping the code for a bit incase it makes sense - // to implement later. - //ctx, cancel := context.WithCancel(p.ctx) - //message.ctx = ctx - // Run the handler once, so we don't have to wait for the first ticker. go func() { _, err := p.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) p.errorKernel.errSend(p, message, er, logError) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } }() @@ -708,7 +698,7 @@ func executeHandler(p process, message Message, thisNode string) { select { case <-p.ctx.Done(): er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) //cancel() return @@ -716,7 +706,7 @@ func executeHandler(p process, message Message, thisNode string) { // Total time reached. End the process. //cancel() er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return @@ -726,7 +716,7 @@ func executeHandler(p process, message Message, thisNode string) { if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) p.errorKernel.errSend(p, message, er, logError) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } }() } @@ -754,7 +744,7 @@ func (p process) verifySigOrAclFlag(message Message) bool { sigOK := p.nodeAuth.verifySignature(message) er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig: Only signature checking enabled, ALLOW the message if sigOK, sigOK=%v, method %v", sigOK, message.Method) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) if sigOK { doHandler = true @@ -766,7 +756,7 @@ func (p process) verifySigOrAclFlag(message Message) bool { aclOK := p.nodeAuth.verifyAcl(message) er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig:both signature and acl checking enabled, allow the message if sigOK and aclOK, or method is not REQCliCommand, sigOK=%v, aclOK=%v, method=%v", sigOK, aclOK, message.Method) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) if sigOK && aclOK { doHandler = true @@ -776,7 +766,7 @@ func (p process) verifySigOrAclFlag(message Message) bool { // of doHandler=false, so the handler is not done. default: er := fmt.Errorf("verifySigOrAclFlag: verify acl/sig: None of the verify flags matched, not doing handler for message, method=%v", message.Method) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) } return doHandler @@ -796,7 +786,7 @@ func (p process) subscribeMessages() *nats.Subscription { }) if err != nil { er := fmt.Errorf("error: Subscribe failed: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } @@ -819,7 +809,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) if err != nil { er := fmt.Errorf("error: zstd new encoder failed: %v", err) - p.errorKernel.logError(er, p.configuration) + p.errorKernel.logError(er) os.Exit(1) } zEnc = enc @@ -843,7 +833,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { if p.isLongRunningPublisher { er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) continue } @@ -858,7 +848,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { er := fmt.Errorf("info: canceled publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return //} @@ -873,7 +863,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return } } @@ -903,7 +893,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, b, err := cbor.Marshal(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return } @@ -916,7 +906,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, err := gobEnc.Encode(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return } @@ -955,7 +945,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, _, err := gzipW.Write(natsMsgPayloadSerialized) if err != nil { er := fmt.Errorf("error: failed to write gzip: %v", err) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return } @@ -971,11 +961,11 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, default: // no compression // Allways log the error to console. er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression") - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) // We only wan't to send the error message to errorCentral once. once.Do(func() { - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) }) // No compression, so we just assign the value of the serialized diff --git a/processes.go b/processes.go index b4ac3eb..0458128 100644 --- a/processes.go +++ b/processes.go @@ -105,10 +105,6 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, REQToFile, nil) } - if proc.configuration.StartSubREQToFileNACK { - proc.startup.subscriber(proc, REQToFileNACK, nil) - } - if proc.configuration.StartSubREQCopySrc { proc.startup.subscriber(proc, REQCopySrc, nil) } @@ -137,7 +133,7 @@ func (p *processes) Start(proc process) { case <-ctx.Done(): er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name()) // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } @@ -202,7 +198,7 @@ func (p *processes) Start(proc process) { case <-ctx.Done(): er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } } @@ -225,7 +221,7 @@ func (p *processes) Start(proc process) { proc.nodeAuth.publicKeys.mu.Lock() er := fmt.Errorf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) m := Message{ FileName: "publickeysget.log", @@ -252,7 +248,7 @@ func (p *processes) Start(proc process) { case <-ctx.Done(): er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } } @@ -273,7 +269,7 @@ func (p *processes) Start(proc process) { proc.nodeAuth.nodeAcl.mu.Lock() er := fmt.Errorf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:])) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) m := Message{ FileName: "aclRequestUpdate.log", @@ -301,7 +297,7 @@ func (p *processes) Start(proc process) { case <-ctx.Done(): er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) return nil } } @@ -376,10 +372,10 @@ func newStartup(server *server) *startup { } // subscriber will start a subscriber process. It takes the initial process, request method, -// and a procFunc as it's input arguments. If a procFunc os not needed, use the value nil. +// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v subscriber: %#v", m, p.node) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) var sub Subject switch { @@ -398,7 +394,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v publisher: %#v", m, p.node) - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) sub := newSubject(m, string(p.node)) proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) proc.procFunc = pf @@ -412,14 +408,14 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr // Print the content of the processes map. func (p *processes) printProcessesMap() { er := fmt.Errorf("output of processes map : ") - p.errorKernel.logDebug(er, p.configuration) + p.errorKernel.logDebug(er) { p.active.mu.Lock() for pName, proc := range p.active.procNames { er := fmt.Errorf("info: proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v", proc.processKind, pName, proc.processID, proc.subject.name()) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) diff --git a/requests.go b/requests.go index 6900aaf..514cb0a 100644 --- a/requests.go +++ b/requests.go @@ -95,8 +95,6 @@ const ( // The data field is a slice of strings where the values of the // slice will be written to the file. REQToFile Method = "REQToFile" - // REQToFileNACK same as REQToFile but NACK. - REQToFileNACK Method = "REQToFileNACK" // Initiated by the user. REQCopySrc Method = "REQCopySrc" // Initial request for file copying. @@ -184,7 +182,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQToConsole: HandlerFunc(methodREQToConsole), REQToFileAppend: HandlerFunc(methodREQToFileAppend), REQToFile: HandlerFunc(methodREQToFile), - REQToFileNACK: HandlerFunc(methodREQToFile), REQCopySrc: HandlerFunc(methodREQCopySrc), REQCopyDst: HandlerFunc(methodREQCopyDst), REQSUBCopySrc: HandlerFunc(methodREQSUB), @@ -406,7 +403,3 @@ func selectFileNaming(message Message, proc process) (string, string) { return fileName, folderTree } - -// ------------------------------------------------------------ -// Subscriber method handlers -// ------------------------------------------------------------ diff --git a/requests_acl.go b/requests_acl.go index 9ba1b36..c730b45 100644 --- a/requests_acl.go +++ b/requests_acl.go @@ -12,8 +12,8 @@ import ( // Handler to get all acl's from a central server. func methodREQAclRequestUpdate(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- subscriber methodREQAclRequestUpdate received from: %v, hash data = %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- subscriber methodREQAclRequestUpdate received from: %v, hash data = %v", message.FromNode, message.Data) + proc.errorKernel.logDebug(er) // fmt.Printf("\n --- subscriber methodREQAclRequestUpdate: the message brought to handler : %+v\n", message) @@ -45,22 +45,22 @@ func methodREQAclRequestUpdate(proc process, message Message, node string) ([]by defer proc.centralAuth.accessLists.schemaGenerated.mu.Unlock() er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: got acl hash from NODE=%v, HASH data =%v", message.FromNode, message.Data) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // Check if the received hash is the same as the one currently active, // If it is the same we exit the handler immediately. hash32 := proc.centralAuth.accessLists.schemaGenerated.GeneratedACLsMap[message.FromNode].Hash hash := hash32[:] er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: the central acl hash=%v", hash32) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) if bytes.Equal(hash, message.Data) { er := fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAVE EQUAL ACL HASH, NOTHING TO DO, EXITING HANDLER") - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return } er = fmt.Errorf("info: subscriber methodREQAclRequestUpdate: NODE AND CENTRAL HAD NOT EQUAL ACL, PREPARING TO SEND NEW VERSION OF Acl") - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // Generate JSON for Message.Data @@ -77,14 +77,14 @@ func methodREQAclRequestUpdate(proc process, message Message, node string) ([]by } er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) newReplyMessage(proc, message, js) }() } }() - // NB: We're not sending an ACK message for this request type. + // We're not sending an ACK message for this request type. return nil, nil } @@ -93,7 +93,7 @@ func methodREQAclRequestUpdate(proc process, message Message, node string) ([]by // Handler to receive the acls from a central server. func methodREQAclDeliverUpdate(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- subscriber methodREQAclDeliverUpdate received from: %v, containing: %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + proc.errorKernel.logDebug(inf) // fmt.Printf("\n --- subscriber methodREQAclRequestUpdate: the message received on handler : %+v\n\n", message) @@ -169,8 +169,8 @@ func methodREQAclDeliverUpdate(proc process, message Message, node string) ([]by // --- func methodREQAclAddCommand(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -231,8 +231,8 @@ func methodREQAclAddCommand(proc process, message Message, node string) ([]byte, // --- func methodREQAclDeleteCommand(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -293,8 +293,8 @@ func methodREQAclDeleteCommand(proc process, message Message, node string) ([]by // --- func methodREQAclDeleteSource(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclDeleteSource received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclDeleteSource received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -354,8 +354,8 @@ func methodREQAclDeleteSource(proc process, message Message, node string) ([]byt // --- func methodREQAclGroupNodesAddNode(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupNodesAddNode received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupNodesAddNode received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -415,8 +415,8 @@ func methodREQAclGroupNodesAddNode(proc process, message Message, node string) ( // --- func methodREQAclGroupNodesDeleteNode(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupNodesDeleteNode received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupNodesDeleteNode received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -476,8 +476,8 @@ func methodREQAclGroupNodesDeleteNode(proc process, message Message, node string // --- func methodREQAclGroupNodesDeleteGroup(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupNodesDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupNodesDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -536,8 +536,8 @@ func methodREQAclGroupNodesDeleteGroup(proc process, message Message, node strin // --- func methodREQAclGroupCommandsAddCommand(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupCommandsAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupCommandsAddCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -597,8 +597,8 @@ func methodREQAclGroupCommandsAddCommand(proc process, message Message, node str // --- func methodREQAclGroupCommandsDeleteCommand(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteCommand received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -658,8 +658,8 @@ func methodREQAclGroupCommandsDeleteCommand(proc process, message Message, node // --- func methodREQAclGroupCommandsDeleteGroup(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclGroupCommandsDeleteGroup received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -718,8 +718,8 @@ func methodREQAclGroupCommandsDeleteGroup(proc process, message Message, node st // --- func methodREQAclExport(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclExport received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclExport received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -774,8 +774,8 @@ func methodREQAclExport(proc process, message Message, node string) ([]byte, err // --- func methodREQAclImport(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- methodREQAclImport received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- methodREQAclImport received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { diff --git a/requests_cli.go b/requests_cli.go index a33342b..02aeea2 100644 --- a/requests_cli.go +++ b/requests_cli.go @@ -13,8 +13,8 @@ import ( // return the output of the command run back to the calling publisher // as a new message. func methodREQCliCommand(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logDebug(er) msgForErrors := message msgForErrors.FileName = msgForErrors.FileName + ".error" @@ -128,8 +128,8 @@ func methodREQCliCommand(proc process, message Message, node string) ([]byte, er // longer time and you want to send the output of the command continually // back as it is generated, and not just when the command is finished. func methodREQCliCommandCont(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + proc.errorKernel.logDebug(er) msgForErrors := message msgForErrors.FileName = msgForErrors.FileName + ".error" diff --git a/requests_copy.go b/requests_copy.go index 3d43a8b..e93275c 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -123,7 +123,7 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error folderPermission := uint64(0755) er := fmt.Errorf("info: before switch: FolderPermission defined in message for socket: %04o", folderPermission) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // Verify and check the methodArgs if len(message.MethodArgs) < 3 { @@ -158,11 +158,11 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error folderPermission, err = strconv.ParseUint(message.MethodArgs[5], 8, 32) if err != nil { er := fmt.Errorf("methodREQCopySrc: failed to parse uint, %v", err) - proc.errorKernel.logError(er, proc.configuration) + proc.errorKernel.logError(er) } er := fmt.Errorf("info: FolderPermission defined in message for socket: %v, converted = %v", message.MethodArgs[5], folderPermission) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) if err != nil { er := fmt.Errorf("error: methodREQCopySrc: unable to convert folderPermission into int value: %v", err) proc.errorKernel.errSend(proc, message, er, logWarning) @@ -200,7 +200,7 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error if err != nil { // errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err) er := fmt.Errorf("error: copySrcSubProcFunc: failed to stat file: %v", err) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return } @@ -324,7 +324,7 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error // the processName. If true, return here and don't start up another // process for that file. // - // NB: This check is put in here if a message for some reason are + // This check is put in here if a message for some reason are // received more than once. The reason that this might happen can be // that a message for the same copy request was received earlier, but // was unable to start up within the timeout specified. The Sender of @@ -342,7 +342,7 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error if ok { er := fmt.Errorf("methodREQCopyDst: subprocesses already existed, will not start another subscriber for %v", pn) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // HERE!!! // If the process name already existed we return here before any @@ -384,10 +384,10 @@ func copySrcSubHandler() func(process, Message, string) ([]byte, error) { select { case <-proc.ctx.Done(): er := fmt.Errorf(" * copySrcHandler ended: %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) case proc.procFuncCh <- message: er := fmt.Errorf("copySrcHandler: passing message over to procFunc: %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } return nil, nil @@ -402,10 +402,10 @@ func copyDstSubHandler() func(process, Message, string) ([]byte, error) { select { case <-proc.ctx.Done(): er := fmt.Errorf(" * copyDstHandler ended: %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) case proc.procFuncCh <- message: er := fmt.Errorf("copyDstHandler: passing message over to procFunc: %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } return nil, nil @@ -486,7 +486,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel select { case <-ctx.Done(): er := fmt.Errorf(" info: canceling copySrcProcFunc : %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return nil // Pick up the message recived by the copySrcSubHandler. @@ -723,7 +723,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc select { case <-ctx.Done(): er := fmt.Errorf(" * copyDstProcFunc ended: %v", proc.processName) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return nil case message := <-procFuncCh: var csa copySubData @@ -739,7 +739,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc hash := sha256.Sum256(csa.CopyData) if hash != csa.Hash { er := fmt.Errorf("error: copyDstSubProcFunc: hash of received message is not correct for: %v", cia.DstMethod) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) csa.CopyStatus = copyResendLast } @@ -844,7 +844,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // HERE: er := fmt.Errorf("info: Before creating folder: cia.FolderPermission: %04o", cia.FolderPermission) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) if _, err := os.Stat(cia.DstDir); os.IsNotExist(err) { // TODO: Add option to set permission here ??? @@ -853,7 +853,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return fmt.Errorf("error: failed to create destination directory for file copying %v: %v", cia.DstDir, err) } er := fmt.Errorf("info: Created folder: with cia.FolderPermission: %04o", cia.FolderPermission) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } // Rename the file so we got a backup. @@ -952,7 +952,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc } er = fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // Signal back to src that we are done, so it can cancel the process. { diff --git a/requests_file_handling.go b/requests_file_handling.go index 51c2be5..b676f24 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -26,7 +26,7 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error { fi, err := os.Stat(file) if err == nil && !os.IsNotExist(err) { er := fmt.Errorf("info: reqWriteFileOrSocket: failed to stat file, but will continue: %v", folderTree) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } if fi != nil && fi.Mode().Type() == fs.ModeSocket { @@ -56,7 +56,7 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error { } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } var fileFlag int @@ -115,7 +115,7 @@ func methodREQToFile(proc process, message Message, node string) ([]byte, error) // as a new message. func methodREQTailFile(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + proc.errorKernel.logDebug(inf) proc.processes.wg.Add(1) go func() { diff --git a/requests_http.go b/requests_http.go index 26cc1b2..a3661d8 100644 --- a/requests_http.go +++ b/requests_http.go @@ -11,8 +11,8 @@ import ( // handler to do a Http Get. func methodREQHttpGet(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) + proc.errorKernel.logDebug(er) msgForErrors := message msgForErrors.FileName = msgForErrors.FileName + ".error" @@ -112,8 +112,8 @@ func methodREQHttpGet(proc process, message Message, node string) ([]byte, error // handler to do a Http Get Scheduled. // The second element of the MethodArgs slice holds the timer defined in seconds. func methodREQHttpGetScheduled(proc process, message Message, node string) ([]byte, error) { - inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data) - proc.errorKernel.logDebug(inf, proc.configuration) + er := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data) + proc.errorKernel.logDebug(er) proc.processes.wg.Add(1) go func() { @@ -152,7 +152,7 @@ func methodREQHttpGetScheduled(proc process, message Message, node string) ([]by ticker := time.NewTicker(time.Second * time.Duration(scheduleInterval)) // Prepare a context that will be for the schedule as a whole. - // NB: Individual http get's will create their own context's + // Individual http get's will create their own context's // derived from this one. ctxScheduler, cancel := context.WithTimeout(proc.ctx, time.Minute*time.Duration(schedulerTotalTime)) diff --git a/requests_keys.go b/requests_keys.go index 1baef5d..a399a27 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -86,21 +86,21 @@ func methodREQKeysRequestUpdate(proc process, message Message, node string) ([]b defer proc.centralAuth.pki.nodesAcked.mu.Unlock() er := fmt.Errorf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v", message.FromNode, message.Data) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // Check if the received hash is the same as the one currently active, if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) { er := fmt.Errorf("info: methodREQKeysRequestUpdate: node %v and central have equal keys, nothing to do, exiting key update handler", message.FromNode) // proc.errorKernel.infoSend(proc, message, er) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) return } er = fmt.Errorf("info: methodREQKeysRequestUpdate: node %v and central had not equal keys, preparing to send new version of keys", message.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) er = fmt.Errorf("info: methodREQKeysRequestUpdate: marshalling new keys and hash to send: map=%v, hash=%v", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash) @@ -109,13 +109,13 @@ func methodREQKeysRequestUpdate(proc process, message Message, node string) ([]b proc.errorKernel.errSend(proc, message, er, logWarning) } er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) newReplyMessage(proc, message, b) }() } }() - // NB: We're not sending an ACK message for this request type. + // We're not sending an ACK message for this request type. return nil, nil } @@ -163,7 +163,7 @@ func methodREQKeysDeliverUpdate(proc process, message Message, node string) ([]b } er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // If the received map was empty we also want to delete all the locally stored keys, // else we copy the marshaled keysAndHash we received from central into our map. @@ -264,7 +264,7 @@ func methodREQKeysAllow(proc process, message Message, node string) ([]byte, err // If new keys were allowed into the main map, we should send out one // single update to all the registered nodes to inform of an update. - // NB: If a node is not reachable at the time the update is sent it is + // If a node is not reachable at the time the update is sent it is // not a problem since the nodes will periodically check for updates. // // If there are errors we will return from the function, and send no @@ -292,7 +292,7 @@ func methodREQKeysAllow(proc process, message Message, node string) ([]byte, err func pushKeys(proc process, message Message, nodes []Node) error { er := fmt.Errorf("info: beginning of pushKeys, nodes=%v", nodes) var knh []byte - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) err := func() error { proc.centralAuth.pki.nodesAcked.mu.Lock() @@ -319,7 +319,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { // For all nodes that is not ack'ed we try to send an update once. for n := range proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap { er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) msg := Message{ ToNode: n, Method: REQKeysDeliverUpdate, @@ -337,7 +337,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.toRingbufferCh <- []subjectAndMessage{sam} er = fmt.Errorf("----> methodREQKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } // Create the data payload of the current allowed keys. @@ -365,7 +365,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { // For all nodes that is ack'ed we try to send an update once. for n := range nodeMap { er := fmt.Errorf("info: node to send REQKeysDeliverUpdate to:%v ", n) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) msg := Message{ ToNode: n, Method: REQKeysDeliverUpdate, @@ -384,7 +384,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.toRingbufferCh <- []subjectAndMessage{sam} er = fmt.Errorf("----> methodREQKeysAllow: sending keys update to node=%v", message.FromNode) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } return nil @@ -393,7 +393,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { func methodREQKeysDelete(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- methodREQKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.errorKernel.logDebug(inf, proc.configuration) + proc.errorKernel.logDebug(inf) proc.processes.wg.Add(1) go func() { @@ -423,13 +423,13 @@ func methodREQKeysDelete(proc process, message Message, node string) ([]byte, er proc.centralAuth.deletePublicKeys(proc, message, message.MethodArgs) er := fmt.Errorf("info: Deleted public keys: %v", message.MethodArgs) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) // All new elements are now added, and we can create a new hash // representing the current keys in the allowed map. proc.centralAuth.updateHash(proc, message) er = fmt.Errorf(" * DEBUG updated hash for public keys") - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) var nodes []Node diff --git a/requests_std.go b/requests_std.go index e09bca7..8513d2d 100644 --- a/requests_std.go +++ b/requests_std.go @@ -25,7 +25,7 @@ func methodREQHello(proc process, message Message, node string) ([]byte, error) } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } // Open file and write data. @@ -74,7 +74,7 @@ func methodREQErrorLog(proc process, message Message, node string) ([]byte, erro } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.errorKernel.logDebug(er, proc.configuration) + proc.errorKernel.logDebug(er) } // Open file and write data. diff --git a/scripts/mini-steward/test.sh b/scripts/mini-steward/test.sh index 946c7e8..393d973 100755 --- a/scripts/mini-steward/test.sh +++ b/scripts/mini-steward/test.sh @@ -64,7 +64,7 @@ fi cat >$systemctlFile <${progName} service -Documentation=https://github.com/RaaLabs/steward +Documentation=https://github.com/postmannen/ctrl After=network-online.target nss-lookup.target Requires=network-online.target nss-lookup.target diff --git a/server.go b/server.go index bb56ef9..073d3ff 100644 --- a/server.go +++ b/server.go @@ -242,7 +242,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { } er := fmt.Errorf("info: creating subscribers data folder at %v", configuration.SubscribersDataFolder) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) } return &s, nil @@ -335,7 +335,7 @@ func (s *server) Start() { // Since all the logic to handle processes are tied to the process // struct, we need to create an initial process to start the rest. // - // NB: The context of the initial process are set in processes.Start. + // The context of the initial process are set in processes.Start. sub := newSubject(REQInitial, s.nodeName) s.processInitial = newProcess(context.TODO(), s, sub, "") // Start all wanted subscriber processes. @@ -351,8 +351,6 @@ func (s *server) Start() { } // Start the processing of new messages from an input channel. - // NB: We might need to create a sub context for the ringbuffer here - // so we can cancel this context last, and not use the server. s.routeMessagesToProcess() // Start reading the channel for injecting direct messages that should @@ -538,7 +536,7 @@ func (s *server) routeMessagesToProcess() { } if ok && ctxCanceled { er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) delete(proc.processes.active.procNames, proc.processName) return false } @@ -549,10 +547,10 @@ func (s *server) routeMessagesToProcess() { select { case proc.subject.messageCh <- m: er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) case <-proc.ctx.Done(): er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) } return true @@ -568,7 +566,7 @@ func (s *server) routeMessagesToProcess() { } er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) var proc process @@ -581,17 +579,17 @@ func (s *server) routeMessagesToProcess() { proc.spawnWorker() er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) // Now when the process is spawned we continue, // and send the message to that new process. select { case proc.subject.messageCh <- m: er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) case <-proc.ctx.Done(): er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er, s.configuration) + s.errorKernel.logDebug(er) } }(sam)