From 00b439d6d2fe81c113f8e34b835b068433f98c5f Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 1 Jul 2021 10:05:34 +0200 Subject: [PATCH] Added initital structure for methodREQToSocket --- startup_processes.go | 10 ++++++++++ subscriber_method_types.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/startup_processes.go b/startup_processes.go index 8c470c7..5234308 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -79,6 +79,8 @@ func (p process) ProcessesStart() { if p.configuration.StartSubREQTailFile.OK { p.startup.subREQTailFile(p) } + + p.startup.subREQToSocket(p) } // --------------------------------------------------------------------------------------- @@ -249,3 +251,11 @@ func (s startup) subREQTailFile(p process) { // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } + +func (s startup) subREQToSocket(p process) { + log.Printf("Starting write to socket subscriber: %#v\n", p.node) + sub := newSubject(REQToSocket, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) +} diff --git a/subscriber_method_types.go b/subscriber_method_types.go index af9caff..d756edb 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -121,6 +121,8 @@ const ( REQHttpGet Method = "REQHttpGet" // Tail file REQTailFile Method = "REQTailFile" + // Write to steward socket + REQToSocket Method = "REQToSocket" ) // The mapping of all the method constants specified, what type @@ -179,6 +181,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQTailFile: methodREQTailFile{ commandOrEvent: EventACK, }, + REQToSocket: methodREQToSocket{ + commandOrEvent: EventACK, + }, }, } @@ -189,7 +194,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { // the Stew client for knowing what of the req types are generally // used as reply methods. func (m Method) GetReplyMethods() []Method { - rm := []Method{REQToConsole, REQToFile, REQToFileAppend} + rm := []Method{REQToConsole, REQToFile, REQToFileAppend, REQToSocket} return rm } @@ -1096,3 +1101,26 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +// --- + +type methodREQToSocket struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQToSocket) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) { + + for _, d := range message.Data { + // Write the data to the socket here. + fmt.Printf("Info: Data to write to socket: %v\n", d) + } + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ----