1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added initial write to socket

This commit is contained in:
postmannen 2023-05-31 09:29:00 +02:00
parent 16ec026453
commit dcefbff670
3 changed files with 38 additions and 87 deletions

View file

@ -1,41 +0,0 @@
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0770)
if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return nil, er
}
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.errorKernel.logDebug(er, proc.configuration)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return nil, err
}
defer f.Close()
_, err = f.Write(message.Data)
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}

View file

@ -1,38 +0,0 @@
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0770)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.errorKernel.logDebug(er, proc.configuration)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return nil, err
}
defer f.Close()
_, err = f.Write(message.Data)
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}

View file

@ -3,23 +3,52 @@ package steward
import ( import (
"fmt" "fmt"
"io" "io"
"io/fs"
"net"
"os" "os"
"path/filepath" "path/filepath"
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
) )
func reqWriteFile(isAppend bool, proc process, message Message) error { func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
// If it was a request type message we want to check what the initial messages // If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data. // method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc) fileName, folderTree := selectFileNaming(message, proc)
file := filepath.Join(folderTree, fileName)
// Check the file is a unix socket, and if it is we write the
// data to the socket instead of writing it to a normal file.
fi, err := os.Stat(file)
if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append failed to stat filepath: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
return er
}
if fi.Mode().Type() == fs.ModeSocket {
// TODO: Write to socket
socket, err := net.Dial("unix", file)
if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
return er
}
defer socket.Close()
_, err = socket.Write([]byte(message.Data))
if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append could not write to socket: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
return er
}
}
// The file is a normal file and not a socket.
// Check if folder structure exist, if not create it. // Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) { if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0770) err := os.MkdirAll(folderTree, 0770)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) er := fmt.Errorf("error: methodREQToFile/Append failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er return er
} }
@ -37,13 +66,11 @@ func reqWriteFile(isAppend bool, proc process, message Message) error {
} }
// Open file and write data. // Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, fileFlag, 0755) f, err := os.OpenFile(file, fileFlag, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQToFile/Append: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return err return er
} }
defer f.Close() defer f.Close()
@ -51,7 +78,8 @@ func reqWriteFile(isAppend bool, proc process, message Message) error {
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile/Append: failed to write to file: file: %v, %v", file, err) er := fmt.Errorf("error: methodREQToFile/Append: failed to write to file: file: %v, %v", file, err)
proc.errorKernel.errSend(proc, message, er, logWarning)
return er
} }
return nil return nil
@ -67,7 +95,8 @@ func (m methodREQToFileAppend) getKind() Event {
// Handle appending data to file. // Handle appending data to file.
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
reqWriteFile(true, proc, message) err := reqWriteFileOrSocket(true, proc, message)
proc.errorKernel.errSend(proc, message, err, logWarning)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -86,7 +115,8 @@ func (m methodREQToFile) getKind() Event {
// Handle writing to a file. Will truncate any existing data if the file did already // Handle writing to a file. Will truncate any existing data if the file did already
// exist. // exist.
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
reqWriteFile(false, proc, message) err := reqWriteFileOrSocket(false, proc, message)
proc.errorKernel.errSend(proc, message, err, logWarning)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil