2023-10-04 20:58:42 +00:00
|
|
|
package ctrl
|
2021-02-02 12:06:37 +00:00
|
|
|
|
|
|
|
import (
|
2021-03-29 11:36:30 +00:00
|
|
|
"bytes"
|
2023-04-15 06:33:35 +00:00
|
|
|
"encoding/json"
|
2021-02-02 12:06:37 +00:00
|
|
|
"fmt"
|
2021-08-23 14:00:48 +00:00
|
|
|
"io"
|
2023-04-15 06:33:35 +00:00
|
|
|
"log"
|
2021-08-23 14:00:48 +00:00
|
|
|
"net"
|
2021-09-10 03:26:16 +00:00
|
|
|
"net/http"
|
2021-08-23 14:00:48 +00:00
|
|
|
"os"
|
2022-01-26 08:23:02 +00:00
|
|
|
"path/filepath"
|
2024-11-27 07:34:49 +00:00
|
|
|
"strings"
|
|
|
|
"time"
|
2022-03-04 12:22:55 +00:00
|
|
|
|
2023-01-08 07:32:58 +00:00
|
|
|
"github.com/fsnotify/fsnotify"
|
2024-11-27 07:34:49 +00:00
|
|
|
"github.com/nats-io/nats.go/jetstream"
|
2023-01-08 07:32:58 +00:00
|
|
|
|
2022-03-04 12:22:55 +00:00
|
|
|
"gopkg.in/yaml.v3"
|
2021-02-02 12:06:37 +00:00
|
|
|
)
|
|
|
|
|
2023-10-04 20:58:42 +00:00
|
|
|
// readStartupFolder will check the <workdir>/startup folder when ctrl
|
2022-01-26 08:23:02 +00:00
|
|
|
// starts for messages to process.
|
|
|
|
// The purpose of the startup folder is that we can define messages on a
|
2023-10-04 20:58:42 +00:00
|
|
|
// node that will be run when ctrl starts up.
|
2022-01-26 08:23:02 +00:00
|
|
|
// Messages defined in the startup folder should have the toNode set to
|
|
|
|
// self, and the from node set to where we want the answer sent. The reason
|
|
|
|
// for this is that all replies normally pick up the host from the original
|
|
|
|
// first message, but here we inject it on an end node so we need to specify
|
|
|
|
// the fromNode to get the reply back to the node we want.
|
2022-03-16 10:05:31 +00:00
|
|
|
//
|
|
|
|
// Messages read from the startup folder will be directly called by the handler
|
|
|
|
// locally, and the message will not be sent via the nats-server.
|
2022-01-26 08:23:02 +00:00
|
|
|
func (s *server) readStartupFolder() {
|
|
|
|
|
|
|
|
// Get the names of all the files in the startup folder.
|
|
|
|
const startupFolder = "startup"
|
|
|
|
filePaths, err := s.getFilePaths(startupFolder)
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: readStartupFolder: unable to get filenames: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2022-01-26 08:23:02 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-09-08 05:37:44 +00:00
|
|
|
for _, fp := range filePaths {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("info: ranging filepaths, current filePath contains: %v", fp)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logInfo(er)
|
2022-09-08 05:37:44 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 08:23:02 +00:00
|
|
|
for _, filePath := range filePaths {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("info: reading and working on file from startup folder %v", filePath)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logInfo(er)
|
2022-01-26 08:23:02 +00:00
|
|
|
|
|
|
|
// Read the content of each file.
|
|
|
|
readBytes, err := func(filePath string) ([]byte, error) {
|
|
|
|
fh, err := os.Open(filePath)
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to open file in startup folder: %v", err)
|
|
|
|
return nil, er
|
|
|
|
}
|
|
|
|
defer fh.Close()
|
|
|
|
|
|
|
|
b, err := io.ReadAll(fh)
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to read file in startup folder: %v", err)
|
|
|
|
return nil, er
|
|
|
|
}
|
|
|
|
|
|
|
|
return b, nil
|
|
|
|
}(filePath)
|
|
|
|
|
|
|
|
if err != nil {
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, err, logWarning)
|
2022-01-26 08:23:02 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = bytes.Trim(readBytes, "\x00")
|
|
|
|
|
|
|
|
// unmarshal the JSON into a struct
|
|
|
|
sams, err := s.convertBytesToSAMs(readBytes)
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: startup folder: malformed json read: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2022-01-26 08:23:02 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if fromNode field is specified, and remove the message if blank.
|
|
|
|
for i := range sams {
|
2023-04-15 06:33:35 +00:00
|
|
|
// We want to allow the use of nodeName local only in startup folder, and
|
|
|
|
// if used we substite it for the local node name.
|
|
|
|
if sams[i].Message.ToNode == "local" {
|
|
|
|
sams[i].Message.ToNode = Node(s.nodeName)
|
|
|
|
sams[i].Subject.ToNode = s.nodeName
|
|
|
|
}
|
|
|
|
|
2022-11-30 06:58:41 +00:00
|
|
|
switch {
|
|
|
|
case sams[i].Message.FromNode == "":
|
2023-04-15 06:33:35 +00:00
|
|
|
// Remove the first message from the slice.
|
2022-01-26 08:23:02 +00:00
|
|
|
sams = append(sams[:i], sams[i+1:]...)
|
2022-11-30 06:58:41 +00:00
|
|
|
er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message")
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2022-11-30 06:58:41 +00:00
|
|
|
|
|
|
|
case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0:
|
2023-04-15 06:33:35 +00:00
|
|
|
// Remove the first message from the slice.
|
2022-11-30 06:58:41 +00:00
|
|
|
sams = append(sams[:i], sams[i+1:]...)
|
|
|
|
er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message")
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2022-01-26 08:23:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2023-04-15 06:33:35 +00:00
|
|
|
j, err := json.MarshalIndent(sams, "", " ")
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("test error: %v\n", err)
|
|
|
|
}
|
|
|
|
er = fmt.Errorf("%v", string(j))
|
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
|
|
|
|
2023-10-04 20:58:42 +00:00
|
|
|
s.samSendLocalCh <- sams
|
2022-01-26 08:23:02 +00:00
|
|
|
|
|
|
|
}
|
2022-09-08 05:37:44 +00:00
|
|
|
|
2022-01-26 08:23:02 +00:00
|
|
|
}
|
|
|
|
|
2024-11-27 07:34:49 +00:00
|
|
|
func (s *server) jetstreamPublish() {
|
|
|
|
// Create a JetStream management interface
|
|
|
|
js, _ := jetstream.New(s.natsConn)
|
|
|
|
|
|
|
|
// Create a stream
|
|
|
|
_, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{
|
2024-11-27 07:54:17 +00:00
|
|
|
Name: "NODES",
|
|
|
|
Subjects: []string{"NODES.>"},
|
|
|
|
MaxMsgsPerSubject: int64(s.configuration.JetStreamMaxMsgsPerSubject),
|
2024-11-27 07:34:49 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
// Publish messages.
|
|
|
|
for {
|
|
|
|
select {
|
2024-11-27 10:30:43 +00:00
|
|
|
case msg := <-s.jetstreamPublishCh:
|
|
|
|
|
|
|
|
b, err := s.messageSerializeAndCompress(msg)
|
2024-11-27 07:34:49 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err)
|
|
|
|
}
|
|
|
|
|
2024-11-27 10:30:43 +00:00
|
|
|
subject := string(fmt.Sprintf("NODES.%v", msg.JetstreamToNode))
|
2024-11-27 07:34:49 +00:00
|
|
|
_, err = js.Publish(s.ctx, subject, b)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err)
|
|
|
|
}
|
|
|
|
|
2024-11-27 10:30:43 +00:00
|
|
|
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, msg)
|
2024-11-27 07:34:49 +00:00
|
|
|
case <-s.ctx.Done():
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) jetstreamConsume() {
|
|
|
|
// Create a JetStream management interface
|
|
|
|
js, _ := jetstream.New(s.natsConn)
|
|
|
|
|
|
|
|
// Create a stream
|
|
|
|
stream, err := js.CreateOrUpdateStream(s.ctx, jetstream.StreamConfig{
|
|
|
|
Name: "NODES",
|
|
|
|
Subjects: []string{"NODES.>"},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error: jetstreamConsume: failed to create stream: %v\n", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The standard streams we want to consume.
|
|
|
|
filterSubjectValues := []string{
|
|
|
|
fmt.Sprintf("NODES.%v", s.nodeName),
|
|
|
|
"NODES.all",
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if there are more to consume defined in flags/env.
|
|
|
|
if s.configuration.JetstreamsConsume != "" {
|
|
|
|
splitValues := strings.Split(s.configuration.JetstreamsConsume, ",")
|
|
|
|
for _, v := range splitValues {
|
|
|
|
filterSubjectValues = append(filterSubjectValues, fmt.Sprintf("NODES.%v", v))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-27 12:16:59 +00:00
|
|
|
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %v", filterSubjectValues)
|
2024-11-27 07:34:49 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
|
|
|
|
|
|
|
cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{
|
|
|
|
Name: s.nodeName,
|
|
|
|
Durable: s.nodeName,
|
|
|
|
FilterSubjects: filterSubjectValues,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v\n", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
consumeContext, _ := cons.Consume(func(msg jetstream.Msg) {
|
|
|
|
er := fmt.Errorf("jetstreamConsume: jetstream msg received: subject %q, data: %q", msg.Subject(), string(msg.Data()))
|
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
|
|
|
|
|
|
|
msg.Ack()
|
|
|
|
|
2024-11-27 11:54:06 +00:00
|
|
|
m, err := s.messageDeserializeAndUncompress(msg.Data())
|
2024-11-27 07:34:49 +00:00
|
|
|
if err != nil {
|
2024-11-27 10:30:43 +00:00
|
|
|
er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err)
|
2024-11-27 07:34:49 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// From here it is the normal message logic that applies, and since messages received
|
|
|
|
// via jetstream are to be handled by the node it was consumed we set the current
|
|
|
|
// nodeName of the consumer in the ctrl Message, so we are sure it is handled locally.
|
|
|
|
m.ToNode = Node(s.nodeName)
|
|
|
|
|
|
|
|
sam, err := newSubjectAndMessage(m)
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: jetstreamConsume: newSubjectAndMessage failed: %v", err)
|
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// If a message is received via
|
|
|
|
s.samSendLocalCh <- []subjectAndMessage{sam}
|
|
|
|
})
|
|
|
|
defer consumeContext.Stop()
|
|
|
|
|
|
|
|
<-s.ctx.Done()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2022-01-26 08:23:02 +00:00
|
|
|
// getFilePaths will get the names of all the messages in
|
|
|
|
// the folder specified from current working directory.
|
|
|
|
func (s *server) getFilePaths(dirName string) ([]string, error) {
|
2022-02-22 08:41:59 +00:00
|
|
|
dirPath, err := os.Executable()
|
|
|
|
dirPath = filepath.Dir(dirPath)
|
2022-01-26 08:23:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
dirPath = filepath.Join(dirPath, dirName)
|
|
|
|
|
|
|
|
// Check if the startup folder exist.
|
|
|
|
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
|
2023-01-10 05:50:28 +00:00
|
|
|
err := os.MkdirAll(dirPath, 0770)
|
2022-01-26 08:23:02 +00:00
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to create startup folder: %v", err)
|
|
|
|
return nil, er
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-06 08:59:07 +00:00
|
|
|
fInfo, err := os.ReadDir(dirPath)
|
2022-01-26 08:23:02 +00:00
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to get filenames in startup folder: %v", err)
|
|
|
|
return nil, er
|
|
|
|
}
|
|
|
|
|
|
|
|
filePaths := []string{}
|
|
|
|
|
|
|
|
for _, v := range fInfo {
|
|
|
|
realpath := filepath.Join(dirPath, v.Name())
|
|
|
|
filePaths = append(filePaths, realpath)
|
|
|
|
}
|
|
|
|
|
|
|
|
return filePaths, nil
|
|
|
|
}
|
|
|
|
|
2021-03-29 11:36:30 +00:00
|
|
|
// readSocket will read the .sock file specified.
|
|
|
|
// It will take a channel of []byte as input, and it is in this
|
|
|
|
// channel the content of a file that has changed is returned.
|
2021-08-25 08:16:55 +00:00
|
|
|
func (s *server) readSocket() {
|
2021-03-29 11:36:30 +00:00
|
|
|
// Loop, and wait for new connections.
|
|
|
|
for {
|
2023-10-04 20:58:42 +00:00
|
|
|
conn, err := s.ctrlSocket.Accept()
|
2021-02-05 06:25:12 +00:00
|
|
|
if err != nil {
|
2021-03-29 11:36:30 +00:00
|
|
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
2021-02-05 06:25:12 +00:00
|
|
|
}
|
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
go func(conn net.Conn) {
|
|
|
|
defer conn.Close()
|
2021-02-05 09:47:07 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
var readBytes []byte
|
2021-03-29 11:36:30 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
for {
|
|
|
|
b := make([]byte, 1500)
|
|
|
|
_, err = conn.Read(b)
|
|
|
|
if err != nil && err != io.EOF {
|
2022-05-23 05:43:34 +00:00
|
|
|
er := fmt.Errorf("error: failed to read data from socket: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-08-23 14:08:21 +00:00
|
|
|
return
|
|
|
|
}
|
2021-02-02 12:06:37 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
readBytes = append(readBytes, b...)
|
2021-03-29 11:36:30 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = bytes.Trim(readBytes, "\x00")
|
|
|
|
|
|
|
|
// unmarshal the JSON into a struct
|
2021-08-26 04:35:54 +00:00
|
|
|
sams, err := s.convertBytesToSAMs(readBytes)
|
2021-08-23 14:08:21 +00:00
|
|
|
if err != nil {
|
2022-04-04 03:34:18 +00:00
|
|
|
er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-08-23 14:08:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-08-25 08:16:55 +00:00
|
|
|
for i := range sams {
|
2021-08-23 14:08:21 +00:00
|
|
|
|
|
|
|
// Fill in the value for the FromNode field, so the receiver
|
|
|
|
// can check this field to know where it came from.
|
2021-08-25 08:16:55 +00:00
|
|
|
sams[i].Message.FromNode = Node(s.nodeName)
|
2022-03-22 13:02:25 +00:00
|
|
|
|
|
|
|
// Send an info message to the central about the message picked
|
|
|
|
// for auditing.
|
|
|
|
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
2024-11-27 07:34:49 +00:00
|
|
|
|
|
|
|
s.newMessagesCh <- sams[i]
|
2021-08-23 14:08:21 +00:00
|
|
|
}
|
2021-02-05 06:25:12 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
// Send the SAM struct to be picked up by the ring buffer.
|
2024-11-27 07:34:49 +00:00
|
|
|
|
2023-10-04 20:58:42 +00:00
|
|
|
s.auditLogCh <- sams
|
2021-03-31 06:56:13 +00:00
|
|
|
|
2021-08-23 14:08:21 +00:00
|
|
|
}(conn)
|
2021-02-05 06:25:12 +00:00
|
|
|
}
|
2021-02-02 12:06:37 +00:00
|
|
|
}
|
|
|
|
|
2023-01-08 07:32:58 +00:00
|
|
|
// readFolder
|
|
|
|
func (s *server) readFolder() {
|
|
|
|
// Check if the startup folder exist.
|
|
|
|
if _, err := os.Stat(s.configuration.ReadFolder); os.IsNotExist(err) {
|
2023-01-10 05:50:28 +00:00
|
|
|
err := os.MkdirAll(s.configuration.ReadFolder, 0770)
|
2023-01-08 07:32:58 +00:00
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to create readfolder folder: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2023-01-08 07:32:58 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
watcher, err := fsnotify.NewWatcher()
|
|
|
|
if err != nil {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("main: failed to create new logWatcher: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2023-01-08 07:32:58 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start listening for events.
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case event, ok := <-watcher.Events:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-11-27 07:34:49 +00:00
|
|
|
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
|
|
|
|
time.Sleep(time.Millisecond * 250)
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("readFolder: got file event, name: %v, op: %v", event.Name, event.Op)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logDebug(er)
|
2023-01-08 07:32:58 +00:00
|
|
|
|
2023-01-09 02:59:13 +00:00
|
|
|
func() {
|
|
|
|
fh, err := os.Open(event.Name)
|
|
|
|
if err != nil {
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("error: readFolder: failed to open readFile from readFolder: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-09 02:59:13 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
b, err := io.ReadAll(fh)
|
|
|
|
if err != nil {
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("error: readFolder: failed to readall from readFolder: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-09 02:59:13 +00:00
|
|
|
fh.Close()
|
|
|
|
return
|
|
|
|
}
|
2023-01-08 07:32:58 +00:00
|
|
|
fh.Close()
|
2023-01-09 02:59:13 +00:00
|
|
|
|
2024-11-27 07:34:49 +00:00
|
|
|
fmt.Printf("------- DEBUG: %v\n", b)
|
|
|
|
|
2023-01-09 02:59:13 +00:00
|
|
|
b = bytes.Trim(b, "\x00")
|
|
|
|
|
|
|
|
// unmarshal the JSON into a struct
|
|
|
|
sams, err := s.convertBytesToSAMs(b)
|
|
|
|
if err != nil {
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-09 02:59:13 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range sams {
|
|
|
|
|
|
|
|
// Fill in the value for the FromNode field, so the receiver
|
|
|
|
// can check this field to know where it came from.
|
|
|
|
sams[i].Message.FromNode = Node(s.nodeName)
|
|
|
|
|
|
|
|
// Send an info message to the central about the message picked
|
|
|
|
// for auditing.
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-09 02:59:13 +00:00
|
|
|
|
2024-11-27 07:34:49 +00:00
|
|
|
// Check if it is a message to publish with Jetstream.
|
|
|
|
if sams[i].Message.JetstreamToNode != "" {
|
|
|
|
|
|
|
|
s.jetstreamPublishCh <- sams[i].Message
|
|
|
|
er = fmt.Errorf("readFolder: read new JETSTREAM message in readfolder and putting it on s.jetstreamPublishCh: %#v", sams)
|
|
|
|
s.errorKernel.logDebug(er)
|
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
s.newMessagesCh <- sams[i]
|
|
|
|
|
|
|
|
er = fmt.Errorf("readFolder: read new message in readfolder and putting it on s.samToSendCh: %#v", sams)
|
|
|
|
s.errorKernel.logDebug(er)
|
|
|
|
}
|
2024-03-10 06:24:09 +00:00
|
|
|
|
2023-01-09 02:59:13 +00:00
|
|
|
// Send the SAM struct to be picked up by the ring buffer.
|
2023-10-04 20:58:42 +00:00
|
|
|
s.auditLogCh <- sams
|
2023-01-09 02:59:13 +00:00
|
|
|
|
|
|
|
// Delete the file.
|
|
|
|
err = os.Remove(event.Name)
|
|
|
|
if err != nil {
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("error: readFolder: failed to remove readFile from readFolder: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-09 02:59:13 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
}()
|
|
|
|
}
|
2023-01-08 07:32:58 +00:00
|
|
|
|
|
|
|
case err, ok := <-watcher.Errors:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
2023-01-09 08:10:52 +00:00
|
|
|
er := fmt.Errorf("error: readFolder: file watcher error: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2023-01-08 07:32:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Add a path.
|
|
|
|
err = watcher.Add(s.configuration.ReadFolder)
|
|
|
|
if err != nil {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("startLogsWatcher: failed to add watcher: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2023-01-08 07:32:58 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-23 14:00:48 +00:00
|
|
|
// readTCPListener wait and read messages delivered on the TCP
|
|
|
|
// port if started.
|
|
|
|
// It will take a channel of []byte as input, and it is in this
|
|
|
|
// channel the content of a file that has changed is returned.
|
2021-09-09 11:32:04 +00:00
|
|
|
func (s *server) readTCPListener() {
|
2021-08-23 14:00:48 +00:00
|
|
|
ln, err := net.Listen("tcp", s.configuration.TCPListener)
|
|
|
|
if err != nil {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("error: readTCPListener: failed to start tcp listener: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2021-08-23 14:00:48 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
// Loop, and wait for new connections.
|
|
|
|
for {
|
|
|
|
|
|
|
|
conn, err := ln.Accept()
|
|
|
|
if err != nil {
|
|
|
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
2021-08-23 14:00:48 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
go func(conn net.Conn) {
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
var readBytes []byte
|
|
|
|
|
|
|
|
for {
|
|
|
|
b := make([]byte, 1500)
|
|
|
|
_, err = conn.Read(b)
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-08-23 14:00:48 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = append(readBytes, b...)
|
|
|
|
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = bytes.Trim(readBytes, "\x00")
|
|
|
|
|
|
|
|
// unmarshal the JSON into a struct
|
2023-10-04 20:58:42 +00:00
|
|
|
sams, err := s.convertBytesToSAMs(readBytes)
|
2021-08-23 14:00:48 +00:00
|
|
|
if err != nil {
|
2021-09-23 06:19:53 +00:00
|
|
|
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-08-23 14:00:48 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-10-04 20:58:42 +00:00
|
|
|
for i := range sams {
|
2021-08-23 14:00:48 +00:00
|
|
|
|
|
|
|
// Fill in the value for the FromNode field, so the receiver
|
|
|
|
// can check this field to know where it came from.
|
2023-10-04 20:58:42 +00:00
|
|
|
sams[i].Message.FromNode = Node(s.nodeName)
|
2024-11-27 07:34:49 +00:00
|
|
|
s.newMessagesCh <- sams[i]
|
2021-08-23 14:00:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Send the SAM struct to be picked up by the ring buffer.
|
2023-10-04 20:58:42 +00:00
|
|
|
s.auditLogCh <- sams
|
2021-08-23 14:00:48 +00:00
|
|
|
|
|
|
|
}(conn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-10 03:26:16 +00:00
|
|
|
func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) {
|
2021-09-10 04:06:43 +00:00
|
|
|
|
|
|
|
var readBytes []byte
|
|
|
|
|
|
|
|
for {
|
|
|
|
b := make([]byte, 1500)
|
|
|
|
_, err := r.Body.Read(b)
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-09-10 04:06:43 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = append(readBytes, b...)
|
|
|
|
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
readBytes = bytes.Trim(readBytes, "\x00")
|
|
|
|
|
|
|
|
// unmarshal the JSON into a struct
|
2023-10-04 20:58:42 +00:00
|
|
|
sams, err := s.convertBytesToSAMs(readBytes)
|
2021-09-10 03:26:16 +00:00
|
|
|
if err != nil {
|
2021-09-23 06:19:53 +00:00
|
|
|
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
2021-09-10 04:06:43 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-10-04 20:58:42 +00:00
|
|
|
for i := range sams {
|
2021-09-10 04:06:43 +00:00
|
|
|
|
|
|
|
// Fill in the value for the FromNode field, so the receiver
|
|
|
|
// can check this field to know where it came from.
|
2023-10-04 20:58:42 +00:00
|
|
|
sams[i].Message.FromNode = Node(s.nodeName)
|
2024-11-27 07:34:49 +00:00
|
|
|
s.newMessagesCh <- sams[i]
|
2021-09-10 03:26:16 +00:00
|
|
|
}
|
|
|
|
|
2021-09-10 04:06:43 +00:00
|
|
|
// Send the SAM struct to be picked up by the ring buffer.
|
2023-10-04 20:58:42 +00:00
|
|
|
s.auditLogCh <- sams
|
2021-09-10 03:26:16 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) readHttpListener() {
|
|
|
|
go func() {
|
|
|
|
n, err := net.Listen("tcp", s.configuration.HTTPListener)
|
|
|
|
if err != nil {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("error: startMetrics: failed to open prometheus listen port: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2021-09-10 03:26:16 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
2021-09-10 03:45:09 +00:00
|
|
|
mux.HandleFunc("/", s.readHTTPlistenerHandler)
|
2021-09-10 03:26:16 +00:00
|
|
|
|
|
|
|
err = http.Serve(n, mux)
|
|
|
|
if err != nil {
|
2023-01-12 11:01:01 +00:00
|
|
|
er := fmt.Errorf("error: startMetrics: failed to start http.Serve: %v", err)
|
2024-03-27 11:48:17 +00:00
|
|
|
s.errorKernel.logError(er)
|
2021-09-10 03:26:16 +00:00
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2021-08-16 11:01:12 +00:00
|
|
|
// The subject are made up of different parts of the message field.
|
|
|
|
// To make things easier and to avoid figuring out what the subject
|
|
|
|
// is in all places we've created the concept of subjectAndMessage
|
|
|
|
// (sam) where we get the subject for the message once, and use the
|
|
|
|
// sam structure with subject alongside the message instead.
|
2021-02-10 09:14:49 +00:00
|
|
|
type subjectAndMessage struct {
|
2021-02-04 10:46:58 +00:00
|
|
|
Subject `json:"subject" yaml:"subject"`
|
|
|
|
Message `json:"message" yaml:"message"`
|
2021-02-03 21:08:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-25 08:16:55 +00:00
|
|
|
// convertBytesToSAMs will range over the byte representing a message given in
|
2021-03-29 11:36:30 +00:00
|
|
|
// json format. For each element found the Message type will be converted into
|
|
|
|
// a SubjectAndMessage type value and appended to a slice, and the slice is
|
|
|
|
// returned to the caller.
|
2021-08-26 04:35:54 +00:00
|
|
|
func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
2021-02-10 09:14:49 +00:00
|
|
|
MsgSlice := []Message{}
|
2021-02-03 21:08:28 +00:00
|
|
|
|
2022-03-04 12:22:55 +00:00
|
|
|
err := yaml.Unmarshal(b, &MsgSlice)
|
2021-02-03 21:08:28 +00:00
|
|
|
if err != nil {
|
2021-02-04 10:46:58 +00:00
|
|
|
return nil, fmt.Errorf("error: unmarshal of file failed: %#v", err)
|
2021-02-03 21:08:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-26 05:02:36 +00:00
|
|
|
// Check for toNode and toNodes field.
|
2021-08-26 04:35:54 +00:00
|
|
|
MsgSlice = s.checkMessageToNodes(MsgSlice)
|
2021-08-26 08:50:40 +00:00
|
|
|
s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice)))
|
2021-08-26 04:35:54 +00:00
|
|
|
|
|
|
|
sam := []subjectAndMessage{}
|
|
|
|
|
|
|
|
// Range over all the messages parsed from json, and create a subject for
|
|
|
|
// each message.
|
|
|
|
for _, m := range MsgSlice {
|
|
|
|
sm, err := newSubjectAndMessage(m)
|
|
|
|
if err != nil {
|
2022-02-18 08:51:11 +00:00
|
|
|
er := fmt.Errorf("error: newSubjectAndMessage: %v", err)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, m, er, logWarning)
|
2022-02-18 08:51:11 +00:00
|
|
|
|
2021-08-26 04:35:54 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
sam = append(sam, sm)
|
|
|
|
}
|
|
|
|
|
|
|
|
return sam, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// checkMessageToNodes will check that either toHost or toHosts are
|
|
|
|
// specified in the message. If not specified it will drop the message
|
|
|
|
// and send an error.
|
|
|
|
// if toNodes is specified, the original message will be used, and
|
|
|
|
// and an individual message will be created with a toNode field for
|
|
|
|
// each if the toNodes specified.
|
|
|
|
func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
|
|
|
|
msgs := []Message{}
|
2021-08-26 04:08:39 +00:00
|
|
|
|
|
|
|
for _, v := range MsgSlice {
|
|
|
|
switch {
|
|
|
|
// if toNode specified, we don't care about the toHosts.
|
|
|
|
case v.ToNode != "":
|
2021-08-26 04:35:54 +00:00
|
|
|
msgs = append(msgs, v)
|
2021-08-26 04:08:39 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
// if toNodes specified, we use the original message, and
|
|
|
|
// create new node messages for each of the nodes specified.
|
|
|
|
case len(v.ToNodes) != 0:
|
|
|
|
for _, n := range v.ToNodes {
|
|
|
|
m := v
|
2021-08-26 04:35:54 +00:00
|
|
|
// Set the toNodes field to nil since we're creating
|
|
|
|
// an individual toNode message for each of the toNodes
|
|
|
|
// found, and hence we no longer need that field.
|
2021-08-26 04:08:39 +00:00
|
|
|
m.ToNodes = nil
|
|
|
|
m.ToNode = n
|
2021-08-26 04:35:54 +00:00
|
|
|
msgs = append(msgs, m)
|
2021-08-26 04:08:39 +00:00
|
|
|
}
|
|
|
|
continue
|
|
|
|
|
|
|
|
// No toNode or toNodes specified. Drop the message by not appending it to
|
|
|
|
// the slice since it is not valid.
|
|
|
|
default:
|
2021-09-23 06:19:53 +00:00
|
|
|
er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v)
|
2023-01-11 07:38:15 +00:00
|
|
|
s.errorKernel.errSend(s.processInitial, v, er, logWarning)
|
2021-08-26 04:08:39 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-26 04:35:54 +00:00
|
|
|
return msgs
|
2021-02-03 21:08:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-25 06:56:44 +00:00
|
|
|
// newSubjectAndMessage will look up the correct values and value types to
|
2021-08-16 11:01:12 +00:00
|
|
|
// be used in a subject for a Message (sam), and return the a combined structure
|
2021-03-10 06:11:14 +00:00
|
|
|
// of type subjectAndMessage.
|
2021-08-25 06:56:44 +00:00
|
|
|
func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
|
2021-03-10 06:11:14 +00:00
|
|
|
// We need to create a tempory method type to look up the kind for the
|
|
|
|
// real method for the message.
|
|
|
|
var mt Method
|
|
|
|
|
2021-03-12 11:08:11 +00:00
|
|
|
tmpH := mt.getHandler(m.Method)
|
|
|
|
if tmpH == nil {
|
2021-11-11 12:43:32 +00:00
|
|
|
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method)
|
|
|
|
}
|
|
|
|
|
|
|
|
switch {
|
|
|
|
case m.ToNode == "":
|
|
|
|
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m)
|
|
|
|
case m.Method == "":
|
|
|
|
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m)
|
2021-03-12 11:08:11 +00:00
|
|
|
}
|
|
|
|
|
2021-03-10 06:11:14 +00:00
|
|
|
sub := Subject{
|
2023-01-04 08:14:04 +00:00
|
|
|
ToNode: string(m.ToNode),
|
|
|
|
Method: m.Method,
|
|
|
|
messageCh: make(chan Message),
|
2021-03-10 06:11:14 +00:00
|
|
|
}
|
|
|
|
|
2021-08-25 06:56:44 +00:00
|
|
|
sam := subjectAndMessage{
|
2021-03-10 06:11:14 +00:00
|
|
|
Subject: sub,
|
|
|
|
Message: m,
|
|
|
|
}
|
|
|
|
|
2021-08-25 06:56:44 +00:00
|
|
|
return sam, nil
|
2021-03-10 06:11:14 +00:00
|
|
|
}
|