mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 12:59:15 +00:00
corrected the reading of the steward socket
This commit is contained in:
parent
b77f04bd5d
commit
b332e13c61
1 changed files with 36 additions and 23 deletions
|
@ -23,35 +23,48 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, 65535)
|
go func(conn net.Conn) {
|
||||||
_, err = conn.Read(b)
|
defer conn.Close()
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: failed to read data from socket: %v", err)
|
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
b = bytes.Trim(b, "\x00")
|
var readBytes []byte
|
||||||
|
|
||||||
// unmarshal the JSON into a struct
|
for {
|
||||||
sam, err := convertBytesToSAM(b)
|
b := make([]byte, 1500)
|
||||||
if err != nil {
|
_, err = conn.Read(b)
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
if err != nil && err != io.EOF {
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||||
continue
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for i := range sam {
|
readBytes = append(readBytes, b...)
|
||||||
|
|
||||||
// Fill in the value for the FromNode field, so the receiver
|
if err == io.EOF {
|
||||||
// can check this field to know where it came from.
|
break
|
||||||
sam[i].Message.FromNode = Node(s.nodeName)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
readBytes = bytes.Trim(readBytes, "\x00")
|
||||||
toRingbufferCh <- sam
|
|
||||||
|
|
||||||
conn.Close()
|
// unmarshal the JSON into a struct
|
||||||
|
sam, err := convertBytesToSAM(readBytes)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range sam {
|
||||||
|
|
||||||
|
// Fill in the value for the FromNode field, so the receiver
|
||||||
|
// can check this field to know where it came from.
|
||||||
|
sam[i].Message.FromNode = Node(s.nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
|
toRingbufferCh <- sam
|
||||||
|
|
||||||
|
}(conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue