From 61e2343f189e57e8c9fb78087c86dd26ddd61b81 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 24 Feb 2021 15:43:31 +0100 Subject: [PATCH] refactored, and implemented error messages to central --- cmd/main.go | 3 +- errorkernel.go | 20 +++++- incommmingBuffer.db | Bin 32768 -> 32768 bytes message-and-subject.go | 78 ++++++++++++++++++++ publisher.go | 8 ++- ringbuffer.go | 2 + server.go | 123 +++++++++++--------------------- subscriberMethodTypes.go | 14 ++++ subscriber.go => subscribers.go | 11 +++ 9 files changed, 174 insertions(+), 85 deletions(-) create mode 100644 message-and-subject.go rename subscriber.go => subscribers.go (82%) diff --git a/cmd/main.go b/cmd/main.go index 71c177a..6dab5cb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,6 +16,7 @@ func main() { brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker") profilingPort := flag.String("profilingPort", "", "The number of the profiling port") promHostAndPort := flag.String("promHostAndPort", ":2112", "host and port for prometheus listener, e.g. localhost:2112") + centralErrorLogger := flag.Bool("centralErrorLogger", false, "seet to true if this is the node that should receive the error log's from other nodes") //isCentral := flag.Bool("isCentral", false, "used to indicate that this is the central master that will subscribe to error message subjects") flag.Parse() @@ -28,7 +29,7 @@ func main() { } - s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort) + s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger) if err != nil { log.Printf("error: failed to connect to broker: %v\n", err) os.Exit(1) diff --git a/errorkernel.go b/errorkernel.go index 631848d..c640dc3 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -40,7 +40,7 @@ func newErrorKernel() *errorKernel { // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. -func (e *errorKernel) startErrorKernel() { +func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage) { // TODO: For now it will just print the error messages to the // console. go func() { @@ -63,6 +63,24 @@ func (e *errorKernel) startErrorKernel() { // log.Printf("*** error_kernel: %#v, type=%T\n", er, er) log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n") + + // // TESTING: Creating an error message to send to errorCentral + // fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") + // sam := subjectAndMessage{ + // Subject: Subject{ + // ToNode: "errorCentral", + // CommandOrEvent: EventNACK, + // Method: ErrorLog, + // }, + // Message: Message{ + // ToNode: "errorCentral", + // Data: []string{"some tull here .............."}, + // CommandOrEvent: EventNACK, + // Method: ErrorLog, + // }, + // } + // newMessagesCh <- []subjectAndMessage{sam} + er.errorActionCh <- errActionContinue }() } diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 7a3a58bd10b32a0ea5c6a1884c4a9d659a095ef1..2dd7e376302d118764dfe520a778a7974877366a 100644 GIT binary patch delta 170 zcmZo@U}|V!n&2Q%zyJX;ty0QThMWzX6%97IqZbn-6> GFaQ87elNWM delta 184 zcmZo@U}|V!n&2Q1!TuS zD(NU?rYKn%=qRNmmLw`!MJpvG7H0rOb(4WaUTR5kNn(kDZem_ddQobDf^MclPG)gQ VYMxT;