diff --git a/read_socket_or_tcp_listener.go b/read_socket_or_tcp_listener.go index 21fe9a4..700e2a5 100644 --- a/read_socket_or_tcp_listener.go +++ b/read_socket_or_tcp_listener.go @@ -13,14 +13,14 @@ import ( // readSocket will read the .sock file specified. // It will take a channel of []byte as input, and it is in this // channel the content of a file that has changed is returned. -func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { +func (s *server) readSocket() { // Loop, and wait for new connections. for { conn, err := s.StewardSocket.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) + sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) } go func(conn net.Conn) { @@ -33,7 +33,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { _, err = conn.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) + sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) return } @@ -47,22 +47,22 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sam, err := convertBytesToSAM(readBytes) + sams, err := convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json: %v", err) - sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) + sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) return } - for i := range sam { + for i := range sams { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sam[i].Message.FromNode = Node(s.nodeName) + sams[i].Message.FromNode = Node(s.nodeName) } // Send the SAM struct to be picked up by the ring buffer. - toRingbufferCh <- sam + s.newMessagesCh <- sams }(conn) } @@ -112,7 +112,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) { readBytes = bytes.Trim(readBytes, "\x00") // unmarshal the JSON into a struct - sam, err := convertBytesToSAM(readBytes) + sam, err := convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json: %v", err) sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) @@ -148,11 +148,11 @@ type subjectAndMessage struct { Message `json:"message" yaml:"message"` } -// convertBytesToSAM will range over the byte representing a message given in +// convertBytesToSAMs will range over the byte representing a message given in // json format. For each element found the Message type will be converted into // a SubjectAndMessage type value and appended to a slice, and the slice is // returned to the caller. -func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) { +func convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { MsgSlice := []Message{} err := json.Unmarshal(b, &MsgSlice) diff --git a/server.go b/server.go index 14ff7c1..8863f63 100644 --- a/server.go +++ b/server.go @@ -221,7 +221,7 @@ func (s *server) Start() { }() // Start the checking the input socket for new messages from operator. - go s.readSocket(s.newMessagesCh) + go s.readSocket() // Check if we should start the tcp listener fro new messages from operator. if s.configuration.TCPListener != "" { @@ -340,8 +340,8 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // Start reading new fresh messages received on the incomming message // pipe/file requested, and fill them into the buffer. go func() { - for samSlice := range s.newMessagesCh { - for _, sam := range samSlice { + for sams := range s.newMessagesCh { + for _, sam := range sams { ringBufferInCh <- sam } }