diff --git a/configuration_flags.go b/configuration_flags.go index 4898a30..92e03d5 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -50,6 +50,8 @@ type Configuration struct { ProfilingPort string `comment:"The number of the profiling port"` // Host and port for prometheus listener, e.g. localhost:2112 PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"` + // Comma separated list of additional streams to consume from. + JetstreamsConsume string // Set to true if this is the node that should receive the error log's from other nodes DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"` // Default value for how long can a request method max be allowed to run in seconds @@ -165,6 +167,7 @@ func NewConfiguration() *Configuration { flag.IntVar(&c.AclUpdateInterval, "aclUpdateInterval", CheckEnv("ACL_UPDATE_INTERVAL", c.AclUpdateInterval).(int), "default interval in seconds for asking the central for acl updates") flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112") + flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run") @@ -242,6 +245,7 @@ func newConfigurationDefaults() Configuration { AclUpdateInterval: 60, ProfilingPort: "", PromHostAndPort: "", + JetstreamsConsume: "", DefaultMessageTimeout: 10, DefaultMessageRetries: 1, DefaultMethodTimeout: 10, diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index a029824..dd0b383 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -11,6 +11,7 @@ - [Messaging](./core_messaging_overview.md) - [Message fields](./core_messaging_message_fields.md) +- [Message jetstream/broadcast](./core_messaging_jetstream.md) - [Request Methods](./core_request_methods.md) - [Nats timeouts](./core_nats_timeouts.md) - [Startup folder](./core_startup_folder.md) diff --git a/doc/src/core_messaging_jetstream.md b/doc/src/core_messaging_jetstream.md new file mode 100644 index 0000000..51d8e66 --- /dev/null +++ b/doc/src/core_messaging_jetstream.md @@ -0,0 +1,62 @@ +# Jetstream + +ctrl takes benefit of some of the streaming features of NATS Jetstream. In short, Jetstream will keep the state of the message queues on the nats-server, whereas with normal ctrl messaging the state of all the messages are kept on and is the responsibility of the node publishing the message. + +With Jetstream some cool posibilities with ctrl become possible. A couple of examples are: + +- Use ctrl in a github runner, and make messages available to be consumed even after the runner have stopped. +- Broadcast message to all nodes. + +## General use + +To use Jetstream instead of regular NATS messaging, put the **node name** of the node that is supposed to consume the message in the **jetstreamToNode** field. + +```yaml +--- +- toNodes: + - mynode1 + jetstreamToNode: mynode1 + method: cliCommand + methodArgs: + - /bin/bash + - -c + - | + tree + methodTimeout: 3 + replyMethod: console + ACKTimeout: 0 +``` + +The request will then be published with NATS Jetstream to all nodes registered on the nats-server. The reply with the result will be sent back as a normal NATS message (not Jetstream). + +## Broadcast + +A message can be broadcasted to all nodes by using the value **all** with jetstreamToNode field of the message like the example below. + +```yaml +--- +- toNodes: + - all + jetstreamToNode: all + method: cliCommand + methodArgs: + - /bin/bash + - -c + - | + tree + methodTimeout: 3 + replyMethod: console + ACKTimeout: 0 +``` + +The request will then be published with NATS Jetstream to all nodes registered on the nats-server. The reply with the result will be sent back as a normal NATS message (not Jetstream). + +## Specify more subjects to consume with streams + +More subject can be specified by using the flag `jetstreamsConsume` or env variable `JETSTREAMS_CONSUME` . + +Example: + +```env +JETSTREAMS_CONSUME=updates,restart,indreostfold +``` diff --git a/errorkernel.go b/errorkernel.go index e0d231b..110e709 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -68,7 +68,7 @@ const logNone logLevel = "none" // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. -func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error { +func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error { // Initiate the slog logger. var replaceFunc func(groups []string, a slog.Attr) slog.Attr if !e.configuration.LogConsoleTimestamps { @@ -137,7 +137,7 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error } // Put the message on the channel to the ringbuffer. - ringBufferBulkInCh <- []subjectAndMessage{sam} + ringBufferBulkInCh <- sam // if errEvent.process.configuration.EnableDebug { // log.Printf("%v\n", er) diff --git a/go.mod b/go.mod index a20fbeb..0ed565c 100644 --- a/go.mod +++ b/go.mod @@ -3,21 +3,21 @@ module github.com/postmannen/ctrl go 1.22 require ( - github.com/fsnotify/fsnotify v1.6.0 + github.com/fsnotify/fsnotify v1.8.0 github.com/fxamacker/cbor/v2 v2.5.0 github.com/go-playground/validator/v10 v10.10.1 github.com/google/uuid v1.3.0 github.com/hpcloud/tail v1.0.0 github.com/jinzhu/copier v0.4.0 github.com/joho/godotenv v1.5.1 - github.com/klauspost/compress v1.17.0 + github.com/klauspost/compress v1.17.2 github.com/nats-io/nats-server/v2 v2.8.4 - github.com/nats-io/nats.go v1.25.0 - github.com/nats-io/nkeys v0.4.4 + github.com/nats-io/nats.go v1.37.0 + github.com/nats-io/nkeys v0.4.7 github.com/pkg/profile v1.7.0 github.com/prometheus/client_golang v1.14.0 go.etcd.io/bbolt v1.3.7 - golang.org/x/crypto v0.7.0 + golang.org/x/crypto v0.18.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 gopkg.in/yaml.v3 v3.0.1 ) @@ -39,8 +39,8 @@ require ( github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect diff --git a/go.sum b/go.sum index 06329a6..e20d6aa 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE= github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= @@ -45,6 +47,8 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -65,9 +69,13 @@ github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBri github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -102,6 +110,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -116,14 +126,21 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/message_and_subject.go b/message_and_subject.go index 7cff482..c28073a 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -21,6 +21,9 @@ type Message struct { // With other words, a message that exists within ctrl is always // for just for a single node. ToNodes []Node `json:"toNodes,omitempty" yaml:"toNodes,omitempty"` + // JetstreamToNode, the topic used to prefix the stream name with + // with the format NODES. . + JetstreamToNode Node `json:"jetstreamToNode" yaml:"jetstreamToNode"` // The Unique ID of the message ID int `json:"id" yaml:"id"` // The actual data in the message. This is typically where we diff --git a/message_readers.go b/message_readers.go index 4e80b43..65c257f 100644 --- a/message_readers.go +++ b/message_readers.go @@ -10,8 +10,11 @@ import ( "net/http" "os" "path/filepath" + "strings" + "time" "github.com/fsnotify/fsnotify" + "github.com/nats-io/nats.go/jetstream" "gopkg.in/yaml.v3" ) @@ -119,6 +122,114 @@ func (s *server) readStartupFolder() { } +func (s *server) jetstreamPublish() { + // Create a JetStream management interface + js, _ := jetstream.New(s.natsConn) + + // Create a stream + _, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{ + Name: "NODES", + Subjects: []string{"NODES.>"}, + // TODO: Create Flag ? + MaxMsgsPerSubject: 100, + // MaxMsgsPerSubject: 1, + }) + + // Publish messages. + for { + select { + case jsMSG := <-s.jetstreamPublishCh: + b, err := json.Marshal(jsMSG) + if err != nil { + log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err) + } + + subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode)) + _, err = js.Publish(s.ctx, subject, b) + if err != nil { + log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err) + } + + fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG) + case <-s.ctx.Done(): + } + } +} + +func (s *server) jetstreamConsume() { + // Create a JetStream management interface + js, _ := jetstream.New(s.natsConn) + + // Create a stream + stream, err := js.CreateOrUpdateStream(s.ctx, jetstream.StreamConfig{ + Name: "NODES", + Subjects: []string{"NODES.>"}, + }) + if err != nil { + log.Printf("error: jetstreamConsume: failed to create stream: %v\n", err) + } + + // The standard streams we want to consume. + filterSubjectValues := []string{ + fmt.Sprintf("NODES.%v", s.nodeName), + "NODES.all", + } + + // Check if there are more to consume defined in flags/env. + if s.configuration.JetstreamsConsume != "" { + splitValues := strings.Split(s.configuration.JetstreamsConsume, ",") + for _, v := range splitValues { + filterSubjectValues = append(filterSubjectValues, fmt.Sprintf("NODES.%v", v)) + } + } + + er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues) + s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) + + cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{ + Name: s.nodeName, + Durable: s.nodeName, + FilterSubjects: filterSubjectValues, + }) + if err != nil { + log.Fatalf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v\n", err) + } + + consumeContext, _ := cons.Consume(func(msg jetstream.Msg) { + er := fmt.Errorf("jetstreamConsume: jetstream msg received: subject %q, data: %q", msg.Subject(), string(msg.Data())) + s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) + + msg.Ack() + + m := Message{} + err := json.Unmarshal(msg.Data(), &m) + if err != nil { + er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err) + s.errorKernel.errSend(s.processInitial, Message{}, er, logError) + return + } + + // From here it is the normal message logic that applies, and since messages received + // via jetstream are to be handled by the node it was consumed we set the current + // nodeName of the consumer in the ctrl Message, so we are sure it is handled locally. + m.ToNode = Node(s.nodeName) + + sam, err := newSubjectAndMessage(m) + if err != nil { + er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err) + s.errorKernel.errSend(s.processInitial, Message{}, er, logError) + return + } + + // If a message is received via + s.samSendLocalCh <- []subjectAndMessage{sam} + }) + defer consumeContext.Stop() + + <-s.ctx.Done() + +} + // getFilePaths will get the names of all the messages in // the folder specified from current working directory. func (s *server) getFilePaths(dirName string) ([]string, error) { @@ -208,10 +319,12 @@ func (s *server) readSocket() { // for auditing. er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) + + s.newMessagesCh <- sams[i] } // Send the SAM struct to be picked up by the ring buffer. - s.newMessagesCh <- sams + s.auditLogCh <- sams }(conn) @@ -246,7 +359,8 @@ func (s *server) readFolder() { return } - if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod { + if event.Op == fsnotify.Create || event.Op == fsnotify.Write { + time.Sleep(time.Millisecond * 250) er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op) s.errorKernel.logDebug(er) @@ -267,6 +381,8 @@ func (s *server) readFolder() { } fh.Close() + fmt.Printf("------- DEBUG: %v\n", b) + b = bytes.Trim(b, "\x00") // unmarshal the JSON into a struct @@ -287,13 +403,24 @@ func (s *server) readFolder() { // for auditing. er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message) s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + + // Check if it is a message to publish with Jetstream. + if sams[i].Message.JetstreamToNode != "" { + + s.jetstreamPublishCh <- sams[i].Message + er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams) + s.errorKernel.logDebug(er) + + continue + } + + s.newMessagesCh <- sams[i] + + er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) + s.errorKernel.logDebug(er) } - er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams) - s.errorKernel.logDebug(er) - // Send the SAM struct to be picked up by the ring buffer. - s.newMessagesCh <- sams s.auditLogCh <- sams // Delete the file. @@ -383,10 +510,10 @@ func (s *server) readTCPListener() { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. sams[i].Message.FromNode = Node(s.nodeName) + s.newMessagesCh <- sams[i] } // Send the SAM struct to be picked up by the ring buffer. - s.newMessagesCh <- sams s.auditLogCh <- sams }(conn) @@ -428,10 +555,10 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. sams[i].Message.FromNode = Node(s.nodeName) + s.newMessagesCh <- sams[i] } // Send the SAM struct to be picked up by the ring buffer. - s.newMessagesCh <- sams s.auditLogCh <- sams } diff --git a/process.go b/process.go index 058ddb5..7dd04c9 100644 --- a/process.go +++ b/process.go @@ -88,7 +88,7 @@ type process struct { // copy of the configuration from server configuration *Configuration // The new messages channel copied from *Server - newMessagesCh chan<- []subjectAndMessage + newMessagesCh chan<- subjectAndMessage // The structure who holds all processes information processes *processes // nats connection @@ -508,6 +508,9 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, message := Message{} + // TODO: Jetstream + // Use CBOR and Compression for all messages, and drop the use of the header fields. + // Check if serialization is specified. // Will default to gob serialization if nothing or non existing value is specified. if val, ok := msg.Header["serial"]; ok { diff --git a/processes.go b/processes.go index 49745b7..469af34 100644 --- a/processes.go +++ b/processes.go @@ -191,7 +191,7 @@ func (p *processes) Start(proc process) { er := fmt.Errorf("error: ProcessesStart: %v", err) p.errorKernel.errSend(proc, m, er, logError) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam select { case <-ticker.C: @@ -241,7 +241,7 @@ func (p *processes) Start(proc process) { // In theory the system should drop the message before it reaches here. p.errorKernel.errSend(proc, m, err, logError) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam select { case <-ticker.C: @@ -290,7 +290,7 @@ func (p *processes) Start(proc process) { p.errorKernel.errSend(proc, m, err, logError) log.Printf("error: ProcessesStart: %v\n", err) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam select { case <-ticker.C: diff --git a/requests.go b/requests.go index 505236c..9fde69c 100644 --- a/requests.go +++ b/requests.go @@ -359,7 +359,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { proc.errorKernel.errSend(proc, message, er, logError) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam } // selectFileNaming will figure out the correct naming of the file diff --git a/requests_copy.go b/requests_copy.go index 8733b80..6fcb94b 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -92,7 +92,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) } @@ -267,7 +267,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { cancel() } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName) @@ -566,7 +566,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam resendRetries = 0 @@ -636,7 +636,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam resendRetries++ @@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam } // Open a tmp folder for where to write the received chunks @@ -801,7 +801,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam case copyResendLast: // The csa already contains copyStatus copyResendLast when reached here, @@ -834,7 +834,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam case copySrcDone: err := func() error { @@ -988,7 +988,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam } cancel() diff --git a/requests_keys.go b/requests_keys.go index 5062976..e222fb5 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -334,7 +334,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) proc.errorKernel.logDebug(er) @@ -381,7 +381,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.newMessagesCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- sam er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode) proc.errorKernel.logDebug(er) diff --git a/requests_test.go b/requests_test.go index c6c7e9e..391355c 100644 --- a/requests_test.go +++ b/requests_test.go @@ -289,7 +289,7 @@ func TestRequest(t *testing.T) { t.Fatalf("newSubjectAndMessage failed: %v\n", err) } - tstSrv.newMessagesCh <- []subjectAndMessage{sam} + tstSrv.newMessagesCh <- sam case viaSocket: msgs := []Message{tt.message} diff --git a/server.go b/server.go index fe5d294..37ed884 100644 --- a/server.go +++ b/server.go @@ -48,9 +48,11 @@ type server struct { // // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. - newMessagesCh chan []subjectAndMessage + newMessagesCh chan subjectAndMessage // directSAMSCh samSendLocalCh chan []subjectAndMessage + // Channel for messages to publish with Jetstream. + jetstreamPublishCh chan Message // errorKernel is doing all the error handling like what to do if // an error occurs. errorKernel *errorKernel @@ -211,21 +213,22 @@ func NewServer(configuration *Configuration, version string) (*server, error) { //} s := server{ - ctx: ctx, - cancel: cancel, - configuration: configuration, - nodeName: configuration.NodeName, - natsConn: conn, - ctrlSocket: ctrlSocket, - newMessagesCh: make(chan []subjectAndMessage), - samSendLocalCh: make(chan []subjectAndMessage), - metrics: metrics, - version: version, - errorKernel: errorKernel, - nodeAuth: nodeAuth, - helloRegister: newHelloRegister(), - centralAuth: centralAuth, - auditLogCh: make(chan []subjectAndMessage), + ctx: ctx, + cancel: cancel, + configuration: configuration, + nodeName: configuration.NodeName, + natsConn: conn, + ctrlSocket: ctrlSocket, + newMessagesCh: make(chan subjectAndMessage), + samSendLocalCh: make(chan []subjectAndMessage), + jetstreamPublishCh: make(chan Message), + metrics: metrics, + version: version, + errorKernel: errorKernel, + nodeAuth: nodeAuth, + helloRegister: newHelloRegister(), + centralAuth: centralAuth, + auditLogCh: make(chan []subjectAndMessage), } s.processes = newProcesses(ctx, &s) @@ -344,6 +347,10 @@ func (s *server) Start() { time.Sleep(time.Second * 1) s.processes.printProcessesMap() + // Start Jetstream publisher and consumer. + go s.jetstreamPublish() + go s.jetstreamConsume() + // Start exposing the the data folder via HTTP if flag is set. if s.configuration.ExposeDataFolder != "" { log.Printf("info: Starting expose of data folder via HTTP\n") @@ -485,115 +492,118 @@ func (s *server) routeMessagesToProcess() { methodsAvailable := method.GetMethodsAvailable() go func() { - for samSlice := range s.newMessagesCh { - for _, sam := range samSlice { + for sam := range s.newMessagesCh { - go func(sam subjectAndMessage) { - s.messageID.mu.Lock() - s.messageID.id++ - sam.Message.ID = s.messageID.id - s.messageID.mu.Unlock() + go func(sam subjectAndMessage) { + // TODO: Jetstream + // Check if Jetstream stream are specified, + // and send off to Jetstream publisher. - s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID)) + s.messageID.mu.Lock() + s.messageID.id++ + sam.Message.ID = s.messageID.id + s.messageID.mu.Unlock() - // Check if the format of the message is correct. - if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er, logError) - return - } + s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID)) - switch { - case sam.Message.Retries < 0: - sam.Message.Retries = s.configuration.DefaultMessageRetries - } - if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 { - sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout - } + // Check if the format of the message is correct. + if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { + er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) + s.errorKernel.errSend(s.processInitial, sam.Message, er, logError) + return + } - // --- + switch { + case sam.Message.Retries < 0: + sam.Message.Retries = s.configuration.DefaultMessageRetries + } + if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 { + sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout + } - m := sam.Message + // --- - subjName := sam.Subject.name() - pn := processNameGet(subjName, processKindPublisher) + m := sam.Message - sendOK := func() bool { - var ctxCanceled bool + subjName := sam.Subject.name() + pn := processNameGet(subjName, processKindPublisher) - s.processes.active.mu.Lock() - defer s.processes.active.mu.Unlock() + sendOK := func() bool { + var ctxCanceled bool - // Check if the process exist, if it do not exist return false so a - // new publisher process will be created. - proc, ok := s.processes.active.procNames[pn] - if !ok { - return false - } + s.processes.active.mu.Lock() + defer s.processes.active.mu.Unlock() - if proc.ctx.Err() != nil { - ctxCanceled = true - } - if ok && ctxCanceled { - er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName) - s.errorKernel.logDebug(er) - delete(proc.processes.active.procNames, proc.processName) - return false - } - - // If found in map above, and go routine for publishing is running, - // put the message on that processes incomming message channel. - if ok && !ctxCanceled { - select { - case proc.subject.messageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er) - } - - return true - } - - // The process was not found, so we return false here so a new publisher - // process will be created later. + // Check if the process exist, if it do not exist return false so a + // new publisher process will be created. + proc, ok := s.processes.active.procNames[pn] + if !ok { return false - }() - - if sendOK { - return } - er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName) - s.errorKernel.logDebug(er) - - sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - var proc process - switch { - case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher) - default: - proc = newProcess(s.ctx, s, sub, processKindPublisher) + if proc.ctx.Err() != nil { + ctxCanceled = true } - - proc.spawnWorker() - er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) - s.errorKernel.logDebug(er) - - // Now when the process is spawned we continue, - // and send the message to that new process. - select { - case proc.subject.messageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) + if ok && ctxCanceled { + er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName) s.errorKernel.logDebug(er) + delete(proc.processes.active.procNames, proc.processName) + return false } - }(sam) - } + // If found in map above, and go routine for publishing is running, + // put the message on that processes incomming message channel. + if ok && !ctxCanceled { + select { + case proc.subject.messageCh <- m: + er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) + s.errorKernel.logDebug(er) + case <-proc.ctx.Done(): + er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) + s.errorKernel.logDebug(er) + } + + return true + } + + // The process was not found, so we return false here so a new publisher + // process will be created later. + return false + }() + + if sendOK { + return + } + + er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName) + s.errorKernel.logDebug(er) + + sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) + var proc process + switch { + case m.IsSubPublishedMsg: + proc = newSubProcess(s.ctx, s, sub, processKindPublisher) + default: + proc = newProcess(s.ctx, s, sub, processKindPublisher) + } + + proc.spawnWorker() + er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) + s.errorKernel.logDebug(er) + + // Now when the process is spawned we continue, + // and send the message to that new process. + select { + case proc.subject.messageCh <- m: + er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) + s.errorKernel.logDebug(er) + case <-proc.ctx.Done(): + er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) + s.errorKernel.logDebug(er) + } + + }(sam) + } }() }