mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 10:57:42 +00:00
adding sending node name to messages
This commit is contained in:
parent
2594495668
commit
042aee0c29
2 changed files with 8 additions and 2 deletions
|
@ -14,7 +14,7 @@ import (
|
||||||
// getMessagesFromFile will start a file watcher for the given directory
|
// getMessagesFromFile will start a file watcher for the given directory
|
||||||
// and filename. It will take a channel of []byte as input, and it is
|
// and filename. It will take a channel of []byte as input, and it is
|
||||||
// in this channel the content of a file that has changed is returned.
|
// in this channel the content of a file that has changed is returned.
|
||||||
func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) {
|
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) {
|
||||||
fileUpdated := make(chan bool)
|
fileUpdated := make(chan bool)
|
||||||
go fileWatcherStart(directoryToCheck, fileUpdated)
|
go fileWatcherStart(directoryToCheck, fileUpdated)
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh
|
||||||
|
|
||||||
for i := range js {
|
for i := range js {
|
||||||
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind)
|
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind)
|
||||||
|
js[i].Message.FromNode = node(s.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the data back to be consumed
|
// Send the data back to be consumed
|
||||||
|
|
|
@ -50,6 +50,7 @@ type Message struct {
|
||||||
Data []string `json:"data" yaml:"data"`
|
Data []string `json:"data" yaml:"data"`
|
||||||
// The type of the message being sent
|
// The type of the message being sent
|
||||||
MessageType MessageKind `json:"messageType" yaml:"messageType"`
|
MessageType MessageKind `json:"messageType" yaml:"messageType"`
|
||||||
|
FromNode node
|
||||||
}
|
}
|
||||||
|
|
||||||
// server is the structure that will hold the state about spawned
|
// server is the structure that will hold the state about spawned
|
||||||
|
@ -99,7 +100,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
|
||||||
|
|
||||||
func (s *server) PublisherStart() {
|
func (s *server) PublisherStart() {
|
||||||
// Start the checking the input file for new messages from operator.
|
// Start the checking the input file for new messages from operator.
|
||||||
go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
|
go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
|
||||||
|
|
||||||
// Prepare and start a single process
|
// Prepare and start a single process
|
||||||
//{
|
//{
|
||||||
|
@ -138,6 +139,10 @@ func (s *server) handleNewOperatorMessages() {
|
||||||
go func() {
|
go func() {
|
||||||
for v := range s.newMessagesCh {
|
for v := range s.newMessagesCh {
|
||||||
for i, vv := range v {
|
for i, vv := range v {
|
||||||
|
|
||||||
|
// Adding a label here so we are able to redo the sending
|
||||||
|
// of the last message if a process with specified subject
|
||||||
|
// is not present.
|
||||||
redo:
|
redo:
|
||||||
m := vv.Message
|
m := vv.Message
|
||||||
subjName := vv.Subject.name()
|
subjName := vv.Subject.name()
|
||||||
|
|
Loading…
Add table
Reference in a new issue