1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

add NODES.all as jetstream subject, and updated doc

added handling of consumed jetstream messages, and fixed a read error with readFolder which sometimes read empty content

newMessagesCh now takes single sam instead of []sams

added jetstream publish channel
This commit is contained in:
postmannen 2024-11-27 08:34:49 +01:00
parent 6c615591a6
commit cdf660aa07
15 changed files with 367 additions and 140 deletions

View file

@ -50,6 +50,8 @@ type Configuration struct {
ProfilingPort string `comment:"The number of the profiling port"`
// Host and port for prometheus listener, e.g. localhost:2112
PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"`
// Comma separated list of additional streams to consume from.
JetstreamsConsume string
// Set to true if this is the node that should receive the error log's from other nodes
DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"`
// Default value for how long can a request method max be allowed to run in seconds
@ -165,6 +167,7 @@ func NewConfiguration() *Configuration {
flag.IntVar(&c.AclUpdateInterval, "aclUpdateInterval", CheckEnv("ACL_UPDATE_INTERVAL", c.AclUpdateInterval).(int), "default interval in seconds for asking the central for acl updates")
flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port")
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112")
flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from")
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level")
flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system")
flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run")
@ -242,6 +245,7 @@ func newConfigurationDefaults() Configuration {
AclUpdateInterval: 60,
ProfilingPort: "",
PromHostAndPort: "",
JetstreamsConsume: "",
DefaultMessageTimeout: 10,
DefaultMessageRetries: 1,
DefaultMethodTimeout: 10,

View file

@ -11,6 +11,7 @@
- [Messaging](./core_messaging_overview.md)
- [Message fields](./core_messaging_message_fields.md)
- [Message jetstream/broadcast](./core_messaging_jetstream.md)
- [Request Methods](./core_request_methods.md)
- [Nats timeouts](./core_nats_timeouts.md)
- [Startup folder](./core_startup_folder.md)

View file

@ -0,0 +1,62 @@
# Jetstream
ctrl takes benefit of some of the streaming features of NATS Jetstream. In short, Jetstream will keep the state of the message queues on the nats-server, whereas with normal ctrl messaging the state of all the messages are kept on and is the responsibility of the node publishing the message.
With Jetstream some cool posibilities with ctrl become possible. A couple of examples are:
- Use ctrl in a github runner, and make messages available to be consumed even after the runner have stopped.
- Broadcast message to all nodes.
## General use
To use Jetstream instead of regular NATS messaging, put the **node name** of the node that is supposed to consume the message in the **jetstreamToNode** field.
```yaml
---
- toNodes:
- mynode1
jetstreamToNode: mynode1
method: cliCommand
methodArgs:
- /bin/bash
- -c
- |
tree
methodTimeout: 3
replyMethod: console
ACKTimeout: 0
```
The request will then be published with NATS Jetstream to all nodes registered on the nats-server. The reply with the result will be sent back as a normal NATS message (not Jetstream).
## Broadcast
A message can be broadcasted to all nodes by using the value **all** with jetstreamToNode field of the message like the example below.
```yaml
---
- toNodes:
- all
jetstreamToNode: all
method: cliCommand
methodArgs:
- /bin/bash
- -c
- |
tree
methodTimeout: 3
replyMethod: console
ACKTimeout: 0
```
The request will then be published with NATS Jetstream to all nodes registered on the nats-server. The reply with the result will be sent back as a normal NATS message (not Jetstream).
## Specify more subjects to consume with streams
More subject can be specified by using the flag `jetstreamsConsume` or env variable `JETSTREAMS_CONSUME` .
Example:
```env
JETSTREAMS_CONSUME=updates,restart,indreostfold
```

View file

@ -68,7 +68,7 @@ const logNone logLevel = "none"
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error {
func (e *errorKernel) start(ringBufferBulkInCh chan<- subjectAndMessage) error {
// Initiate the slog logger.
var replaceFunc func(groups []string, a slog.Attr) slog.Attr
if !e.configuration.LogConsoleTimestamps {
@ -137,7 +137,7 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
}
// Put the message on the channel to the ringbuffer.
ringBufferBulkInCh <- []subjectAndMessage{sam}
ringBufferBulkInCh <- sam
// if errEvent.process.configuration.EnableDebug {
// log.Printf("%v\n", er)

14
go.mod
View file

@ -3,21 +3,21 @@ module github.com/postmannen/ctrl
go 1.22
require (
github.com/fsnotify/fsnotify v1.6.0
github.com/fsnotify/fsnotify v1.8.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/go-playground/validator/v10 v10.10.1
github.com/google/uuid v1.3.0
github.com/hpcloud/tail v1.0.0
github.com/jinzhu/copier v0.4.0
github.com/joho/godotenv v1.5.1
github.com/klauspost/compress v1.17.0
github.com/klauspost/compress v1.17.2
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.25.0
github.com/nats-io/nkeys v0.4.4
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.14.0
go.etcd.io/bbolt v1.3.7
golang.org/x/crypto v0.7.0
golang.org/x/crypto v0.18.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
gopkg.in/yaml.v3 v3.0.1
)
@ -39,8 +39,8 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect

17
go.sum
View file

@ -13,6 +13,8 @@ github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE=
github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
@ -45,6 +47,8 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
@ -65,9 +69,13 @@ github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBri
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@ -102,6 +110,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@ -116,14 +126,21 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -21,6 +21,9 @@ type Message struct {
// With other words, a message that exists within ctrl is always
// for just for a single node.
ToNodes []Node `json:"toNodes,omitempty" yaml:"toNodes,omitempty"`
// JetstreamToNode, the topic used to prefix the stream name with
// with the format NODES.<JetstreamToNode> .
JetstreamToNode Node `json:"jetstreamToNode" yaml:"jetstreamToNode"`
// The Unique ID of the message
ID int `json:"id" yaml:"id"`
// The actual data in the message. This is typically where we

View file

@ -10,8 +10,11 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"github.com/nats-io/nats.go/jetstream"
"gopkg.in/yaml.v3"
)
@ -119,6 +122,114 @@ func (s *server) readStartupFolder() {
}
func (s *server) jetstreamPublish() {
// Create a JetStream management interface
js, _ := jetstream.New(s.natsConn)
// Create a stream
_, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{
Name: "NODES",
Subjects: []string{"NODES.>"},
// TODO: Create Flag ?
MaxMsgsPerSubject: 100,
// MaxMsgsPerSubject: 1,
})
// Publish messages.
for {
select {
case jsMSG := <-s.jetstreamPublishCh:
b, err := json.Marshal(jsMSG)
if err != nil {
log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err)
}
subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode))
_, err = js.Publish(s.ctx, subject, b)
if err != nil {
log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err)
}
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG)
case <-s.ctx.Done():
}
}
}
func (s *server) jetstreamConsume() {
// Create a JetStream management interface
js, _ := jetstream.New(s.natsConn)
// Create a stream
stream, err := js.CreateOrUpdateStream(s.ctx, jetstream.StreamConfig{
Name: "NODES",
Subjects: []string{"NODES.>"},
})
if err != nil {
log.Printf("error: jetstreamConsume: failed to create stream: %v\n", err)
}
// The standard streams we want to consume.
filterSubjectValues := []string{
fmt.Sprintf("NODES.%v", s.nodeName),
"NODES.all",
}
// Check if there are more to consume defined in flags/env.
if s.configuration.JetstreamsConsume != "" {
splitValues := strings.Split(s.configuration.JetstreamsConsume, ",")
for _, v := range splitValues {
filterSubjectValues = append(filterSubjectValues, fmt.Sprintf("NODES.%v", v))
}
}
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues)
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{
Name: s.nodeName,
Durable: s.nodeName,
FilterSubjects: filterSubjectValues,
})
if err != nil {
log.Fatalf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v\n", err)
}
consumeContext, _ := cons.Consume(func(msg jetstream.Msg) {
er := fmt.Errorf("jetstreamConsume: jetstream msg received: subject %q, data: %q", msg.Subject(), string(msg.Data()))
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
msg.Ack()
m := Message{}
err := json.Unmarshal(msg.Data(), &m)
if err != nil {
er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
return
}
// From here it is the normal message logic that applies, and since messages received
// via jetstream are to be handled by the node it was consumed we set the current
// nodeName of the consumer in the ctrl Message, so we are sure it is handled locally.
m.ToNode = Node(s.nodeName)
sam, err := newSubjectAndMessage(m)
if err != nil {
er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
return
}
// If a message is received via
s.samSendLocalCh <- []subjectAndMessage{sam}
})
defer consumeContext.Stop()
<-s.ctx.Done()
}
// getFilePaths will get the names of all the messages in
// the folder specified from current working directory.
func (s *server) getFilePaths(dirName string) ([]string, error) {
@ -208,10 +319,12 @@ func (s *server) readSocket() {
// for auditing.
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
s.newMessagesCh <- sams[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sams
s.auditLogCh <- sams
}(conn)
@ -246,7 +359,8 @@ func (s *server) readFolder() {
return
}
if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod {
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
time.Sleep(time.Millisecond * 250)
er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op)
s.errorKernel.logDebug(er)
@ -267,6 +381,8 @@ func (s *server) readFolder() {
}
fh.Close()
fmt.Printf("------- DEBUG: %v\n", b)
b = bytes.Trim(b, "\x00")
// unmarshal the JSON into a struct
@ -287,13 +403,24 @@ func (s *server) readFolder() {
// for auditing.
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message)
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
// Check if it is a message to publish with Jetstream.
if sams[i].Message.JetstreamToNode != "" {
s.jetstreamPublishCh <- sams[i].Message
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams)
s.errorKernel.logDebug(er)
continue
}
s.newMessagesCh <- sams[i]
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
s.errorKernel.logDebug(er)
}
er := fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
s.errorKernel.logDebug(er)
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sams
s.auditLogCh <- sams
// Delete the file.
@ -383,10 +510,10 @@ func (s *server) readTCPListener() {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
s.newMessagesCh <- sams[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sams
s.auditLogCh <- sams
}(conn)
@ -428,10 +555,10 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sams[i].Message.FromNode = Node(s.nodeName)
s.newMessagesCh <- sams[i]
}
// Send the SAM struct to be picked up by the ring buffer.
s.newMessagesCh <- sams
s.auditLogCh <- sams
}

View file

@ -88,7 +88,7 @@ type process struct {
// copy of the configuration from server
configuration *Configuration
// The new messages channel copied from *Server
newMessagesCh chan<- []subjectAndMessage
newMessagesCh chan<- subjectAndMessage
// The structure who holds all processes information
processes *processes
// nats connection
@ -508,6 +508,9 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
message := Message{}
// TODO: Jetstream
// Use CBOR and Compression for all messages, and drop the use of the header fields.
// Check if serialization is specified.
// Will default to gob serialization if nothing or non existing value is specified.
if val, ok := msg.Header["serial"]; ok {

View file

@ -191,7 +191,7 @@ func (p *processes) Start(proc process) {
er := fmt.Errorf("error: ProcessesStart: %v", err)
p.errorKernel.errSend(proc, m, er, logError)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
select {
case <-ticker.C:
@ -241,7 +241,7 @@ func (p *processes) Start(proc process) {
// In theory the system should drop the message before it reaches here.
p.errorKernel.errSend(proc, m, err, logError)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
select {
case <-ticker.C:
@ -290,7 +290,7 @@ func (p *processes) Start(proc process) {
p.errorKernel.errSend(proc, m, err, logError)
log.Printf("error: ProcessesStart: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
select {
case <-ticker.C:

View file

@ -359,7 +359,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
proc.errorKernel.errSend(proc, message, er, logError)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
}
// selectFileNaming will figure out the correct naming of the file

View file

@ -92,7 +92,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
}
@ -267,7 +267,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
cancel()
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
@ -566,7 +566,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
resendRetries = 0
@ -636,7 +636,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
resendRetries++
@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
}
// Open a tmp folder for where to write the received chunks
@ -801,7 +801,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
case copyResendLast:
// The csa already contains copyStatus copyResendLast when reached here,
@ -834,7 +834,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
case copySrcDone:
err := func() error {
@ -988,7 +988,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return er
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
}
cancel()

View file

@ -334,7 +334,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
proc.errorKernel.logDebug(er)
@ -381,7 +381,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
proc.errorKernel.errSend(proc, message, er, logWarning)
}
proc.newMessagesCh <- []subjectAndMessage{sam}
proc.newMessagesCh <- sam
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
proc.errorKernel.logDebug(er)

View file

@ -289,7 +289,7 @@ func TestRequest(t *testing.T) {
t.Fatalf("newSubjectAndMessage failed: %v\n", err)
}
tstSrv.newMessagesCh <- []subjectAndMessage{sam}
tstSrv.newMessagesCh <- sam
case viaSocket:
msgs := []Message{tt.message}

224
server.go
View file

@ -48,9 +48,11 @@ type server struct {
//
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
newMessagesCh chan []subjectAndMessage
newMessagesCh chan subjectAndMessage
// directSAMSCh
samSendLocalCh chan []subjectAndMessage
// Channel for messages to publish with Jetstream.
jetstreamPublishCh chan Message
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
@ -211,21 +213,22 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
//}
s := server{
ctx: ctx,
cancel: cancel,
configuration: configuration,
nodeName: configuration.NodeName,
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan []subjectAndMessage),
samSendLocalCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
errorKernel: errorKernel,
nodeAuth: nodeAuth,
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
ctx: ctx,
cancel: cancel,
configuration: configuration,
nodeName: configuration.NodeName,
natsConn: conn,
ctrlSocket: ctrlSocket,
newMessagesCh: make(chan subjectAndMessage),
samSendLocalCh: make(chan []subjectAndMessage),
jetstreamPublishCh: make(chan Message),
metrics: metrics,
version: version,
errorKernel: errorKernel,
nodeAuth: nodeAuth,
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
}
s.processes = newProcesses(ctx, &s)
@ -344,6 +347,10 @@ func (s *server) Start() {
time.Sleep(time.Second * 1)
s.processes.printProcessesMap()
// Start Jetstream publisher and consumer.
go s.jetstreamPublish()
go s.jetstreamConsume()
// Start exposing the the data folder via HTTP if flag is set.
if s.configuration.ExposeDataFolder != "" {
log.Printf("info: Starting expose of data folder via HTTP\n")
@ -485,115 +492,118 @@ func (s *server) routeMessagesToProcess() {
methodsAvailable := method.GetMethodsAvailable()
go func() {
for samSlice := range s.newMessagesCh {
for _, sam := range samSlice {
for sam := range s.newMessagesCh {
go func(sam subjectAndMessage) {
s.messageID.mu.Lock()
s.messageID.id++
sam.Message.ID = s.messageID.id
s.messageID.mu.Unlock()
go func(sam subjectAndMessage) {
// TODO: Jetstream
// Check if Jetstream stream are specified,
// and send off to Jetstream publisher.
s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID))
s.messageID.mu.Lock()
s.messageID.id++
sam.Message.ID = s.messageID.id
s.messageID.mu.Unlock()
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er, logError)
return
}
s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID))
switch {
case sam.Message.Retries < 0:
sam.Message.Retries = s.configuration.DefaultMessageRetries
}
if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 {
sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout
}
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er, logError)
return
}
// ---
switch {
case sam.Message.Retries < 0:
sam.Message.Retries = s.configuration.DefaultMessageRetries
}
if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 {
sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout
}
m := sam.Message
// ---
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
m := sam.Message
sendOK := func() bool {
var ctxCanceled bool
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
s.processes.active.mu.Lock()
defer s.processes.active.mu.Unlock()
sendOK := func() bool {
var ctxCanceled bool
// Check if the process exist, if it do not exist return false so a
// new publisher process will be created.
proc, ok := s.processes.active.procNames[pn]
if !ok {
return false
}
s.processes.active.mu.Lock()
defer s.processes.active.mu.Unlock()
if proc.ctx.Err() != nil {
ctxCanceled = true
}
if ok && ctxCanceled {
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
s.errorKernel.logDebug(er)
delete(proc.processes.active.procNames, proc.processName)
return false
}
// If found in map above, and go routine for publishing is running,
// put the message on that processes incomming message channel.
if ok && !ctxCanceled {
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logDebug(er)
}
return true
}
// The process was not found, so we return false here so a new publisher
// process will be created later.
// Check if the process exist, if it do not exist return false so a
// new publisher process will be created.
proc, ok := s.processes.active.procNames[pn]
if !ok {
return false
}()
if sendOK {
return
}
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
s.errorKernel.logDebug(er)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher)
if proc.ctx.Err() != nil {
ctxCanceled = true
}
proc.spawnWorker()
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logDebug(er)
// Now when the process is spawned we continue,
// and send the message to that new process.
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
if ok && ctxCanceled {
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
s.errorKernel.logDebug(er)
delete(proc.processes.active.procNames, proc.processName)
return false
}
}(sam)
}
// If found in map above, and go routine for publishing is running,
// put the message on that processes incomming message channel.
if ok && !ctxCanceled {
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logDebug(er)
}
return true
}
// The process was not found, so we return false here so a new publisher
// process will be created later.
return false
}()
if sendOK {
return
}
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
s.errorKernel.logDebug(er)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher)
}
proc.spawnWorker()
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logDebug(er)
// Now when the process is spawned we continue,
// and send the message to that new process.
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logDebug(er)
}
}(sam)
}
}()
}