From 7229f33cacaabea9d25671a2b02429ce82f451e0 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 10 Sep 2021 05:26:16 +0200 Subject: [PATCH] initial http reader for messages --- configuration_flags.go | 10 ++++++++++ read_socket_or_tcp_listener.go | 33 +++++++++++++++++++++++++++++++++ server.go | 7 ++++++- 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/configuration_flags.go b/configuration_flags.go index 4a1353e..5439c12 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -23,6 +23,8 @@ type Configuration struct { SocketFolder string // TCP Listener for sending messages to the system TCPListener string + // HTTP Listener for sending messages to the system + HTTPListener string // The folder where the database should live DatabaseFolder string // some unique string to identify this Edge unit @@ -90,6 +92,7 @@ type ConfigurationFromFile struct { ConfigFolder *string SocketFolder *string TCPListener *string + HTTPListener *string DatabaseFolder *string NodeName *string BrokerAddress *string @@ -132,6 +135,7 @@ func newConfigurationDefaults() Configuration { ConfigFolder: "./etc/", SocketFolder: "./tmp", TCPListener: "", + HTTPListener: "", DatabaseFolder: "./var/lib", BrokerAddress: "127.0.0.1:4222", NatsConnectRetryInterval: 10, @@ -185,6 +189,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.TCPListener = *cf.TCPListener } + if cf.HTTPListener == nil { + conf.HTTPListener = cd.HTTPListener + } else { + conf.HTTPListener = *cf.HTTPListener + } if cf.DatabaseFolder == nil { conf.DatabaseFolder = cd.DatabaseFolder } else { @@ -360,6 +369,7 @@ func (c *Configuration) CheckFlags() error { //flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/steward/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER") flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.") flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") + flag.StringVar(&c.HTTPListener, "httpListener", fc.HTTPListener, "start up a HTTP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.") flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit") flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker") diff --git a/read_socket_or_tcp_listener.go b/read_socket_or_tcp_listener.go index d94f336..f8e456c 100644 --- a/read_socket_or_tcp_listener.go +++ b/read_socket_or_tcp_listener.go @@ -7,7 +7,10 @@ import ( "io" "log" "net" + "net/http" "os" + + "github.com/prometheus/client_golang/prometheus/promhttp" ) // readSocket will read the .sock file specified. @@ -137,6 +140,36 @@ func (s *server) writeStewSocket(toStewSocketCh []byte) { //s.StewSockListener } +func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + er := fmt.Errorf("error: readHTTPListenerHandler: %v", err) + sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er) + } + r.Body.Close() + + log.Printf("got: %v\n", string(b)) + +} + +func (s *server) readHttpListener() { + go func() { + n, err := net.Listen("tcp", s.configuration.HTTPListener) + if err != nil { + log.Printf("error: startMetrics: failed to open prometheus listen port: %v\n", err) + os.Exit(1) + } + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + err = http.Serve(n, mux) + if err != nil { + log.Printf("error: startMetrics: failed to start http.Serve: %v\n", err) + os.Exit(1) + } + }() +} + // The subject are made up of different parts of the message field. // To make things easier and to avoid figuring out what the subject // is in all places we've created the concept of subjectAndMessage diff --git a/server.go b/server.go index 75bd0f4..f6ad0e6 100644 --- a/server.go +++ b/server.go @@ -214,11 +214,16 @@ func (s *server) Start() { // Start the checking the input socket for new messages from operator. go s.readSocket() - // Check if we should start the tcp listener fro new messages from operator. + // Check if we should start the tcp listener for new messages from operator. if s.configuration.TCPListener != "" { go s.readTCPListener() } + // Check if we should start the http listener for new messages from operator. + if s.configuration.HTTPListener != "" { + go s.readHttpListener() + } + // Start up the predefined subscribers. // // Since all the logic to handle processes are tied to the process