diff --git a/read_socket_or_tcp_listener.go b/read_socket_or_tcp_listener.go index 9842dc1..b4cff27 100644 --- a/read_socket_or_tcp_listener.go +++ b/read_socket_or_tcp_listener.go @@ -139,14 +139,44 @@ func (s *server) writeStewSocket(toStewSocketCh []byte) { } func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) { - b, err := io.ReadAll(r.Body) - if err != nil { - er := fmt.Errorf("error: readHTTPListenerHandler: %v", err) - sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er) - } - r.Body.Close() - log.Printf("got: %v\n", string(b)) + var readBytes []byte + + for { + b := make([]byte, 1500) + _, err := r.Body.Read(b) + if err != nil && err != io.EOF { + er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) + sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er) + return + } + + readBytes = append(readBytes, b...) + + if err == io.EOF { + break + } + } + + readBytes = bytes.Trim(readBytes, "\x00") + + // unmarshal the JSON into a struct + sam, err := s.convertBytesToSAMs(readBytes) + if err != nil { + er := fmt.Errorf("error: malformed json: %v", err) + sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er) + return + } + + for i := range sam { + + // Fill in the value for the FromNode field, so the receiver + // can check this field to know where it came from. + sam[i].Message.FromNode = Node(s.nodeName) + } + + // Send the SAM struct to be picked up by the ring buffer. + s.newMessagesCh <- sam }