1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Changed message logic to handle REQSub sub requests

This commit is contained in:
postmannen 2022-06-14 07:05:38 +02:00
parent 537e7886dd
commit 1895fcb398
5 changed files with 110 additions and 26 deletions

View file

@ -124,7 +124,8 @@ type Subject struct {
func newSubject(method Method, node string) Subject {
// Get the Event type for the Method.
ma := method.GetMethodsAvailable()
mh, ok := ma.Methodhandlers[method]
mh, ok := ma.CheckIfExists(method)
//mh, ok := ma.Methodhandlers[method]
if !ok {
log.Printf("error: no Event type specified for the method: %v\n", method)
os.Exit(1)

View file

@ -696,7 +696,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
go p.publishAMessage(m, zEnc, once, natsConn)
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
log.Printf("%v\n", er)
return

View file

@ -24,10 +24,17 @@
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// that constant as the key in the MethodsAvailable map, where
// the value is the actual type you want to map it to with a
// handler method. You also specify if it is a Command or Event,
// and if it is ACK or NACK.
//
// Requests used in sub processes should always start with the
// naming REQSUB. Since the method of a sub process are defined
// within the method handler of the owning reqest type we should
// use the methodREQSUB for these types. The methodREQSUB handler
// does nothing.
//
// Check out the existing code below for more examples.
package steward
@ -36,6 +43,7 @@ import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
)
@ -95,11 +103,16 @@ const (
REQCopyFileFrom Method = "REQCopyFileFrom"
// Write the destination copied to some node.
REQCopyFileTo Method = "REQCopyFileTo"
// Send Hello I'm here message.
// Read the source file to be copied to some node.
// Initial request for file copying.
// Initiated by the user.
REQCopySrc Method = "REQCopySrc"
// Write the destination copied to some node.
// Initial request for file copying.
// Generated by the source to send initial information to the destination.
REQCopyDst Method = "REQCopyDst"
// Read the source file to be copied to some node.
REQSubCopySrc Method = "REQSubCopySrc"
// Write the destination copied to some node.
REQSubCopyDst Method = "REQSubCopyDst"
// Send Hello I'm here message.
REQHello Method = "REQHello"
// Error log methods to centralError node.
@ -227,6 +240,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQCopyDst: methodREQCopyDst{
event: EventACK,
},
REQSubCopySrc: methodREQSub{
event: EventACK,
},
REQSubCopyDst: methodREQSub{
event: EventACK,
},
REQHello: methodREQHello{
event: EventNACK,
},
@ -370,6 +389,26 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([
// ----
// place holder method used for sub processes.
// Methods used in sub processes are defined within the the requests
// they are spawned in, so this type is primarily for us to use the
// same logic with sub process requests as we do with normal requests.
type methodREQSub struct {
event Event
}
func (m methodREQSub) getKind() Event {
return m.event
}
func (m methodREQSub) handler(proc process, message Message, node string) ([]byte, error) {
// proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
type MethodsAvailable struct {
@ -380,6 +419,13 @@ type MethodsAvailable struct {
// value will be set to true, and the methodHandler function for that type
// will be returned.
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
// First check if it is a sub process.
if strings.HasPrefix(string(m), "REQSub") {
// Strip of the uuid after the method name.
sp := strings.Split(string(m), ".")
m = Method(sp[0])
}
mFunc, ok := ma.Methodhandlers[m]
if ok {
return mFunc, true

View file

@ -91,18 +91,19 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Create a subject for one copy request
uid := uuid.New()
subProcessName = fmt.Sprintf("copySrc.%v", uid.String())
subProcessName = fmt.Sprintf("REQSubCopySrc.%v", uid.String())
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
m := Method(subProcessName)
cia := copyInitialData{
UUID: uid.String(),
DstDir: dstDir,
DstFile: dstFile,
UUID: uid.String(),
SrcMethod: m,
DstDir: dstDir,
DstFile: dstFile,
}
m := Method(subProcessName)
sub := newSubjectNoVerifyHandler(m, node)
// Create a new sub process that will do the actual file copying.
@ -110,10 +111,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc, cia)
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia)
// assign a handler to the sub process
copySrcSubProc.handler = copySrcHandler(cia)
copySrcSubProc.handler = copySrcSubHandler(cia)
// The process will be killed when the context expires.
go copySrcSubProc.spawnWorker()
@ -157,9 +158,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
}
type copyInitialData struct {
UUID string
DstDir string
DstFile string
UUID string
SrcMethod Method
SrcNode Node
DstMethod Method
DstNode Node
DstDir string
DstFile string
}
// ----
@ -201,7 +206,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
// Create a subject for one copy request
subProcessName = fmt.Sprintf("copyDst.%v", cia.UUID)
subProcessName = fmt.Sprintf("REQSubCopyDst.%v", cia.UUID)
m := Method(subProcessName)
sub := newSubjectNoVerifyHandler(m, node)
@ -211,10 +216,10 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc, cia)
copyDstSubProc.procFunc = copyDstProcSubFunc(copyDstSubProc, cia, message)
// assign a handler to the sub process
copyDstSubProc.handler = copyDstHandler(cia)
copyDstSubProc.handler = copyDstSubHandler(cia)
// The process will be killed when the context expires.
go copyDstSubProc.spawnWorker()
@ -230,11 +235,13 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
return ackMsg, nil
}
func copySrcHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
// HERE!
// We should receive a ready message generated by the procFunc of Dst.
fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE------------------\n\n")
select {
case <-proc.ctx.Done():
log.Printf(" * copySrcHandler ended: %v\n", proc.processName)
@ -246,8 +253,9 @@ func copySrcHandler(cia copyInitialData) func(process, Message, string) ([]byte,
return h
}
func copyDstHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
select {
case <-proc.ctx.Done():
log.Printf(" * copyDstHandler ended: %v\n", proc.processName)
@ -259,7 +267,7 @@ func copyDstHandler(cia copyInitialData) func(process, Message, string) ([]byte,
return h
}
func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
select {
@ -273,10 +281,37 @@ func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, ch
return pf
}
func copyDstProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
type copyStatus int
const (
copyReady copyStatus = iota
)
func copyDstProcSubFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia)
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending to:%v\n ", message.FromNode)
msg := Message{
ToNode: message.FromNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
}
sub := Subject{
ToNode: string(message.FromNode),
Event: EventACK,
Method: cia.SrcMethod,
}
sam := subjectAndMessage{
Subject: sub,
Message: msg,
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
select {
case <-ctx.Done():
log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName)

View file

@ -437,6 +437,8 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// Signal back to the ringbuffer that message have been picked up.
samDBVal.delivered()
// TODO HERE!: The message will be dropped here since the method for copy uid does not exist
sam := samDBVal.samDBValue.Data
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {