2023-10-04 20:58:42 +00:00
package ctrl
2022-05-19 18:54:33 +00:00
import (
"fmt"
2023-04-18 04:29:59 +00:00
"io"
2023-05-31 07:29:00 +00:00
"io/fs"
"net"
2022-05-19 18:54:33 +00:00
"os"
"path/filepath"
"github.com/hpcloud/tail"
)
2023-05-31 07:29:00 +00:00
func reqWriteFileOrSocket ( isAppend bool , proc process , message Message ) error {
2022-05-19 18:54:33 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName , folderTree := selectFileNaming ( message , proc )
2023-05-31 07:29:00 +00:00
file := filepath . Join ( folderTree , fileName )
2023-06-06 10:23:26 +00:00
// log.Fatalf("EXITING\n")
2023-05-31 07:29:00 +00:00
// Check the file is a unix socket, and if it is we write the
// data to the socket instead of writing it to a normal file.
fi , err := os . Stat ( file )
2023-06-06 10:23:26 +00:00
if err == nil && ! os . IsNotExist ( err ) {
er := fmt . Errorf ( "info: reqWriteFileOrSocket: failed to stat file, but will continue: %v" , folderTree )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2023-06-06 10:23:26 +00:00
}
if fi != nil && fi . Mode ( ) . Type ( ) == fs . ModeSocket {
socket , err := net . Dial ( "unix" , file )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2023-06-06 10:23:26 +00:00
return er
}
defer socket . Close ( )
2023-05-31 07:29:00 +00:00
2023-06-06 10:23:26 +00:00
_ , err = socket . Write ( [ ] byte ( message . Data ) )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodToFile/Append could not write to socket: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2023-06-06 10:23:26 +00:00
return er
2023-05-31 07:29:00 +00:00
}
}
// The file is a normal file and not a socket.
2022-05-19 18:54:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
2023-01-10 05:50:28 +00:00
err := os . MkdirAll ( folderTree , 0770 )
2022-05-19 18:54:33 +00:00
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodToFile/Append failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2023-05-31 03:06:00 +00:00
return er
2022-05-19 18:54:33 +00:00
}
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( er )
2022-05-19 18:54:33 +00:00
}
2023-05-31 03:06:00 +00:00
var fileFlag int
switch isAppend {
case true :
fileFlag = os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC
case false :
fileFlag = os . O_CREATE | os . O_RDWR | os . O_TRUNC
}
2022-05-19 18:54:33 +00:00
// Open file and write data.
2023-05-31 03:06:00 +00:00
f , err := os . OpenFile ( file , fileFlag , 0755 )
2022-05-19 18:54:33 +00:00
if err != nil {
2024-12-03 15:17:33 +00:00
er := fmt . Errorf ( "error: methodToFile/Append: failed to open file %v, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , file , message . Directory , message . FileName , err )
fmt . Printf ( "%v\n" , er )
2023-05-31 07:29:00 +00:00
return er
2022-05-19 18:54:33 +00:00
}
defer f . Close ( )
_ , err = f . Write ( message . Data )
f . Sync ( )
if err != nil {
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodToFile/Append: failed to write to file: file: %v, %v" , file , err )
2023-05-31 07:29:00 +00:00
return er
2022-05-19 18:54:33 +00:00
}
2023-05-31 03:06:00 +00:00
return nil
}
// Handle appending data to file.
2024-11-19 02:48:42 +00:00
func methodFileAppend ( proc process , message Message , node string ) ( [ ] byte , error ) {
2023-05-31 07:29:00 +00:00
err := reqWriteFileOrSocket ( true , proc , message )
proc . errorKernel . errSend ( proc , message , err , logWarning )
2023-05-31 03:06:00 +00:00
2022-05-19 18:54:33 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// -----
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
2024-11-19 02:48:42 +00:00
func methodToFile ( proc process , message Message , node string ) ( [ ] byte , error ) {
2023-05-31 07:29:00 +00:00
err := reqWriteFileOrSocket ( false , proc , message )
proc . errorKernel . errSend ( proc , message , err , logWarning )
2022-05-19 18:54:33 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2024-11-19 02:48:42 +00:00
// --- methodTailFile
2022-05-19 18:54:33 +00:00
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
2024-11-19 02:48:42 +00:00
func methodTailFile ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-05-19 18:54:33 +00:00
inf := fmt . Errorf ( "<--- TailFile REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2024-03-27 11:48:17 +00:00
proc . errorKernel . logDebug ( inf )
2022-05-19 18:54:33 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 1 :
2024-11-19 02:48:42 +00:00
er := fmt . Errorf ( "error: methodTailFile: got <1 number methodArgs" )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-05-19 18:54:33 +00:00
return
}
fp := message . MethodArgs [ 0 ]
// var ctx context.Context
// var cancel context.CancelFunc
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
// Note: Replacing the 0 timeout with specific timeout.
// if message.MethodTimeout != 0 {
// ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// } else {
// ctx, cancel = context.WithCancel(proc.ctx)
// }
outCh := make ( chan [ ] byte )
t , err := tail . TailFile ( fp , tail . Config { Follow : true , Location : & tail . SeekInfo {
Offset : 0 ,
2023-04-18 04:29:59 +00:00
Whence : io . SeekEnd ,
2022-05-19 18:54:33 +00:00
} } )
if err != nil {
er := fmt . Errorf ( "error: methodREQToTailFile: tailFile: %v" , err )
2023-01-11 07:38:15 +00:00
proc . errorKernel . errSend ( proc , message , er , logWarning )
2022-05-19 18:54:33 +00:00
}
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
for {
select {
case line := <- t . Lines :
outCh <- [ ] byte ( line . Text + "\n" )
case <- ctx . Done ( ) :
return
}
}
} ( )
for {
select {
case <- ctx . Done ( ) :
cancel ( )
// Close the lines channel so we exit the reading lines
// go routine.
// close(t.Lines)
er := fmt . Errorf ( "info: method timeout reached REQTailFile, canceling: %v" , message . MethodArgs )
proc . errorKernel . infoSend ( proc , message , er )
return
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}