mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-20 22:52:13 +00:00
Transer of files initially works
This commit is contained in:
parent
49a88b83ce
commit
7c991ee3d5
3 changed files with 235 additions and 0 deletions
|
@ -70,6 +70,8 @@ type Configuration struct {
|
|||
StartSubREQToFileAppend bool
|
||||
// Subscriber for writing to file
|
||||
StartSubREQToFile bool
|
||||
// Subscriber for reading files to copy
|
||||
StartSubREQCopyFileFrom bool
|
||||
// Subscriber for Echo Request
|
||||
StartSubREQPing bool
|
||||
// Subscriber for Echo Reply
|
||||
|
@ -117,6 +119,7 @@ type ConfigurationFromFile struct {
|
|||
StartSubREQHello *bool
|
||||
StartSubREQToFileAppend *bool
|
||||
StartSubREQToFile *bool
|
||||
StartSubREQCopyFileFrom *bool
|
||||
StartSubREQPing *bool
|
||||
StartSubREQPong *bool
|
||||
StartSubREQCliCommand *bool
|
||||
|
@ -160,6 +163,7 @@ func newConfigurationDefaults() Configuration {
|
|||
StartSubREQHello: true,
|
||||
StartSubREQToFileAppend: true,
|
||||
StartSubREQToFile: true,
|
||||
StartSubREQCopyFileFrom: true,
|
||||
StartSubREQPing: true,
|
||||
StartSubREQPong: true,
|
||||
StartSubREQCliCommand: true,
|
||||
|
@ -298,6 +302,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
|||
} else {
|
||||
conf.StartSubREQToFile = *cf.StartSubREQToFile
|
||||
}
|
||||
if cf.StartSubREQCopyFileFrom == nil {
|
||||
conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom
|
||||
} else {
|
||||
conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom
|
||||
}
|
||||
if cf.StartSubREQPing == nil {
|
||||
conf.StartSubREQPing = cd.StartSubREQPing
|
||||
} else {
|
||||
|
@ -396,6 +405,7 @@ func (c *Configuration) CheckFlags() error {
|
|||
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false")
|
||||
|
|
13
processes.go
13
processes.go
|
@ -107,6 +107,11 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subREQToFile(proc)
|
||||
}
|
||||
|
||||
// Start a subscriber for reading file to copy
|
||||
if proc.configuration.StartSubREQCopyFileFrom {
|
||||
proc.startup.subREQCopyFileFrom(proc)
|
||||
}
|
||||
|
||||
// Start a subscriber for Hello messages
|
||||
if proc.configuration.StartSubREQHello {
|
||||
proc.startup.subREQHello(proc)
|
||||
|
@ -320,6 +325,14 @@ func (s startup) subREQToFile(p process) {
|
|||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQCopyFileFrom(p process) {
|
||||
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCopyFileFrom, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQToFileAppend(p process) {
|
||||
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToFileAppend, string(p.node))
|
||||
|
|
212
requests.go
212
requests.go
|
@ -98,6 +98,10 @@ const (
|
|||
// The data field is a slice of strings where the values of the
|
||||
// slice will be written to the file.
|
||||
REQToFile Method = "REQToFile"
|
||||
// Read the source file to be copied to some node.
|
||||
REQCopyFileFrom Method = "REQCopyFileFrom"
|
||||
// Write the destination copied to some node.
|
||||
REQCopyFileTo Method = "REQCopyFileTo"
|
||||
// Send Hello I'm here message.
|
||||
REQHello Method = "REQHello"
|
||||
// Error log methods to centralError node.
|
||||
|
@ -159,6 +163,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQToFile: methodREQToFile{
|
||||
commandOrEvent: EventACK,
|
||||
},
|
||||
REQCopyFileFrom: methodREQCopyFileFrom{
|
||||
commandOrEvent: EventACK,
|
||||
},
|
||||
REQCopyFileTo: methodREQCopyFileTo{
|
||||
commandOrEvent: EventACK,
|
||||
},
|
||||
REQHello: methodREQHello{
|
||||
commandOrEvent: EventNACK,
|
||||
},
|
||||
|
@ -642,6 +652,208 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
type methodREQCopyFileFrom struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodREQCopyFileFrom) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
// exist.
|
||||
func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
||||
SrcFilePath := message.MethodArgs[0]
|
||||
DstNode := message.MethodArgs[1]
|
||||
DstFilePath := message.MethodArgs[2]
|
||||
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*2)
|
||||
|
||||
outCh := make(chan []byte)
|
||||
errCh := make(chan error)
|
||||
|
||||
// Read the file, and put the result on the out channel to be sent when done reading.
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
fmt.Printf(" * DEBUG: beginning of read file go routine\n")
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
const natsMaxMsgSize = 1000000
|
||||
|
||||
fi, err := os.Stat(SrcFilePath)
|
||||
|
||||
// Check if the src file exists, and that it is not bigger than
|
||||
// the default limit used by nats which is 1MB.
|
||||
switch {
|
||||
case os.IsNotExist(err):
|
||||
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
|
||||
return
|
||||
case fi.Size() > natsMaxMsgSize:
|
||||
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
|
||||
return
|
||||
}
|
||||
|
||||
fh, err := os.Open(SrcFilePath)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf(" * DEBUG: before io.ReadAll\n")
|
||||
|
||||
b, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf(" * DEBUG: after io.ReadAll: b contains: %v\n", string(b))
|
||||
|
||||
select {
|
||||
case outCh <- b:
|
||||
fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
|
||||
case <-ctx.Done():
|
||||
fmt.Printf(" * DEBUG: after io.ReadAll: ctx.Done\n")
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait here until we got the data to send, then create a new message
|
||||
// and send it.
|
||||
// Also checking the ctx.Done which calls Cancel will allow us to
|
||||
// kill all started go routines started by this message.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Printf(" ** DEBUG: got ctx.Done\n")
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
return
|
||||
case er := <-errCh:
|
||||
fmt.Printf(" ** DEBUG: received on errCh: <-errCh\n")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
cancel()
|
||||
|
||||
return
|
||||
case out := <-outCh:
|
||||
fmt.Printf(" ** DEBUG: got data on out channel: case out:=<-outCh\n")
|
||||
cancel()
|
||||
|
||||
dstDir := filepath.Dir(DstFilePath)
|
||||
dstFile := filepath.Base(DstFilePath)
|
||||
|
||||
// Prepare for sending a new message with the output
|
||||
// TODO: Maybe changing the type of Message.Data to []byte ?
|
||||
|
||||
// Copy the original message to get the defaults for timeouts etc,
|
||||
// and set new values for fields to change.
|
||||
msg := message
|
||||
msg.ToNode = Node(DstNode)
|
||||
msg.Method = REQToFile
|
||||
// TODO: msg.Method = REQCopyFileTo
|
||||
msg.Data = []string{string(out)}
|
||||
msg.Directory = dstDir
|
||||
msg.FileName = dstFile
|
||||
|
||||
// Create SAM and put the message on the send new message channel.
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
fmt.Printf(" * DEBUG: sending SAM: %#v\n", sam)
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
// TODO: Should we also send a reply message with the result back
|
||||
// to where the message originated ?
|
||||
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
type methodREQCopyFileTo struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodREQCopyFileTo) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
// exist.
|
||||
// Same as the REQToFile, but this requst type don't use the default data folder path
|
||||
// for where to store files or add information about node names.
|
||||
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
// If it was a request type message we want to check what the initial messages
|
||||
// method, so we can use that in creating the file name to store the data.
|
||||
fileName, folderTree := message.FileName, message.Directory
|
||||
|
||||
// Check if folder structure exist, if not create it.
|
||||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
return nil, er
|
||||
}
|
||||
|
||||
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
for _, d := range message.Data {
|
||||
_, err := f.Write([]byte(d))
|
||||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ----
|
||||
type methodREQHello struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
|
|
Loading…
Add table
Reference in a new issue