1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

implemented tcp listener for new messages

This commit is contained in:
postmannen 2021-08-23 16:00:48 +02:00
parent 83b9c60e90
commit b77f04bd5d
3 changed files with 77 additions and 0 deletions

View file

@ -81,6 +81,8 @@ type Configuration struct {
ConfigFolder string
// The folder where the socket file should live
SocketFolder string
// TCP Listener for sending messages to the system
TCPListener string
// The folder where the database should live
DatabaseFolder string
// some unique string to identify this Edge unit
@ -146,6 +148,7 @@ func newConfigurationDefaults() Configuration {
c := Configuration{
ConfigFolder: "/usr/local/steward/etc/",
SocketFolder: "./tmp",
TCPListener: "",
DatabaseFolder: "./var/lib",
BrokerAddress: "127.0.0.1:4222",
ProfilingPort: "",
@ -197,6 +200,7 @@ func (c *Configuration) CheckFlags() error {
//flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/steward/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER")
flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.")
flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost")
flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.")
flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit")
flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker")

View file

@ -4,7 +4,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"os"
)
// readSocket will read the .sock file specified.
@ -52,6 +55,71 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
}
}
// readTCPListener wait and read messages delivered on the TCP
// port if started.
// It will take a channel of []byte as input, and it is in this
// channel the content of a file that has changed is returned.
func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
ln, err := net.Listen("tcp", s.configuration.TCPListener)
if err != nil {
log.Printf("error: readTCPListener: failed to start tcp listener: %v\n", err)
os.Exit(1)
}
// Loop, and wait for new connections.
for {
conn, err := ln.Accept()
if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
continue
}
go func(conn net.Conn) {
defer conn.Close()
var readBytes []byte
for {
b := make([]byte, 1500)
_, err = conn.Read(b)
if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
return
}
readBytes = append(readBytes, b...)
if err == io.EOF {
break
}
}
readBytes = bytes.Trim(readBytes, "\x00")
// unmarshal the JSON into a struct
sam, err := convertBytesToSAM(readBytes)
if err != nil {
er := fmt.Errorf("error: malformed json: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
return
}
for i := range sam {
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
sam[i].Message.FromNode = Node(s.nodeName)
}
// Send the SAM struct to be picked up by the ring buffer.
toRingbufferCh <- sam
}(conn)
}
}
// TODO: Create the writer go routine for this socket.
func (s *server) writeStewSocket(toStewSocketCh []byte) {
//s.StewSockListener

View file

@ -222,6 +222,11 @@ func (s *server) Start() {
// Start the checking the input socket for new messages from operator.
go s.readSocket(s.toRingbufferCh)
// Check if we should start the tcp listener fro new messages from operator.
if s.configuration.TCPListener != "" {
go s.readTCPListener(s.toRingbufferCh)
}
// Start up the predefined subscribers.
//
// Since all the logic to handle processes are tied to the process