mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 04:49:17 +00:00
renamed variables
This commit is contained in:
parent
7367710747
commit
2a182e78a2
2 changed files with 14 additions and 14 deletions
|
@ -13,14 +13,14 @@ import (
|
||||||
// readSocket will read the .sock file specified.
|
// readSocket will read the .sock file specified.
|
||||||
// It will take a channel of []byte as input, and it is in this
|
// It will take a channel of []byte as input, and it is in this
|
||||||
// channel the content of a file that has changed is returned.
|
// channel the content of a file that has changed is returned.
|
||||||
func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
func (s *server) readSocket() {
|
||||||
|
|
||||||
// Loop, and wait for new connections.
|
// Loop, and wait for new connections.
|
||||||
for {
|
for {
|
||||||
conn, err := s.StewardSocket.Accept()
|
conn, err := s.StewardSocket.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(conn net.Conn) {
|
go func(conn net.Conn) {
|
||||||
|
@ -33,7 +33,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
_, err = conn.Read(b)
|
_, err = conn.Read(b)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,22 +47,22 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
readBytes = bytes.Trim(readBytes, "\x00")
|
readBytes = bytes.Trim(readBytes, "\x00")
|
||||||
|
|
||||||
// unmarshal the JSON into a struct
|
// unmarshal the JSON into a struct
|
||||||
sam, err := convertBytesToSAM(readBytes)
|
sams, err := convertBytesToSAMs(readBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range sam {
|
for i := range sams {
|
||||||
|
|
||||||
// Fill in the value for the FromNode field, so the receiver
|
// Fill in the value for the FromNode field, so the receiver
|
||||||
// can check this field to know where it came from.
|
// can check this field to know where it came from.
|
||||||
sam[i].Message.FromNode = Node(s.nodeName)
|
sams[i].Message.FromNode = Node(s.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
toRingbufferCh <- sam
|
s.newMessagesCh <- sams
|
||||||
|
|
||||||
}(conn)
|
}(conn)
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
|
||||||
readBytes = bytes.Trim(readBytes, "\x00")
|
readBytes = bytes.Trim(readBytes, "\x00")
|
||||||
|
|
||||||
// unmarshal the JSON into a struct
|
// unmarshal the JSON into a struct
|
||||||
sam, err := convertBytesToSAM(readBytes)
|
sam, err := convertBytesToSAMs(readBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
|
@ -148,11 +148,11 @@ type subjectAndMessage struct {
|
||||||
Message `json:"message" yaml:"message"`
|
Message `json:"message" yaml:"message"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertBytesToSAM will range over the byte representing a message given in
|
// convertBytesToSAMs will range over the byte representing a message given in
|
||||||
// json format. For each element found the Message type will be converted into
|
// json format. For each element found the Message type will be converted into
|
||||||
// a SubjectAndMessage type value and appended to a slice, and the slice is
|
// a SubjectAndMessage type value and appended to a slice, and the slice is
|
||||||
// returned to the caller.
|
// returned to the caller.
|
||||||
func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) {
|
func convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
||||||
MsgSlice := []Message{}
|
MsgSlice := []Message{}
|
||||||
|
|
||||||
err := json.Unmarshal(b, &MsgSlice)
|
err := json.Unmarshal(b, &MsgSlice)
|
||||||
|
|
|
@ -221,7 +221,7 @@ func (s *server) Start() {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Start the checking the input socket for new messages from operator.
|
// Start the checking the input socket for new messages from operator.
|
||||||
go s.readSocket(s.newMessagesCh)
|
go s.readSocket()
|
||||||
|
|
||||||
// Check if we should start the tcp listener fro new messages from operator.
|
// Check if we should start the tcp listener fro new messages from operator.
|
||||||
if s.configuration.TCPListener != "" {
|
if s.configuration.TCPListener != "" {
|
||||||
|
@ -340,8 +340,8 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// Start reading new fresh messages received on the incomming message
|
// Start reading new fresh messages received on the incomming message
|
||||||
// pipe/file requested, and fill them into the buffer.
|
// pipe/file requested, and fill them into the buffer.
|
||||||
go func() {
|
go func() {
|
||||||
for samSlice := range s.newMessagesCh {
|
for sams := range s.newMessagesCh {
|
||||||
for _, sam := range samSlice {
|
for _, sam := range sams {
|
||||||
ringBufferInCh <- sam
|
ringBufferInCh <- sam
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue