mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
cleaned up no longer used code
This commit is contained in:
parent
fc6e80b1bf
commit
c098c623f1
7 changed files with 33 additions and 31 deletions
|
@ -69,7 +69,7 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
|||
p := pki{
|
||||
// schema: make(map[Node]map[argsString]signatureBase32),
|
||||
nodesAcked: newNodesAcked(),
|
||||
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration),
|
||||
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(),
|
||||
configuration: configuration,
|
||||
bucketNamePublicKeys: "publicKeys",
|
||||
errorKernel: errorKernel,
|
||||
|
@ -398,7 +398,7 @@ type nodeNotAckedPublicKeys struct {
|
|||
}
|
||||
|
||||
// newNodeNotAckedPublicKeys will return a prepared type of nodePublicKeys.
|
||||
func newNodeNotAckedPublicKeys(configuration *Configuration) *nodeNotAckedPublicKeys {
|
||||
func newNodeNotAckedPublicKeys() *nodeNotAckedPublicKeys {
|
||||
n := nodeNotAckedPublicKeys{
|
||||
KeyMap: make(map[Node][]byte),
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ type process struct {
|
|||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
|
||||
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
server.processes.mu.Lock()
|
||||
server.processes.lastProcessID++
|
||||
|
|
|
@ -390,7 +390,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
|
|||
}
|
||||
|
||||
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
|
||||
proc.procFunc = pf
|
||||
|
||||
go proc.spawnWorker()
|
||||
|
@ -400,7 +400,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
|
|||
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
|
||||
p.errorKernel.logDebug(er, p.configuration)
|
||||
sub := newSubject(m, string(p.node))
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher, nil)
|
||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher)
|
||||
proc.procFunc = pf
|
||||
proc.isLongRunningPublisher = true
|
||||
|
||||
|
|
|
@ -225,14 +225,14 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error
|
|||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
|
||||
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
|
||||
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message)
|
||||
|
||||
// assign a handler to the sub process
|
||||
copySrcSubProc.handler = copySrcSubHandler(cia)
|
||||
copySrcSubProc.handler = copySrcSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copySrcSubProc.spawnWorker()
|
||||
|
@ -280,8 +280,8 @@ func methodREQCopySrc(proc process, message Message, node string) ([]byte, error
|
|||
}
|
||||
|
||||
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
|
||||
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
|
||||
p := newProcess(ctx, server, subject, processKind, procFunc)
|
||||
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
|
||||
p := newProcess(ctx, server, subject, processKind)
|
||||
p.isSubProcess = true
|
||||
|
||||
return p
|
||||
|
@ -352,14 +352,14 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error
|
|||
}
|
||||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
|
||||
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel)
|
||||
|
||||
// assign a handler to the sub process
|
||||
copyDstSubProc.handler = copyDstSubHandler(cia)
|
||||
copyDstSubProc.handler = copyDstSubHandler()
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copyDstSubProc.spawnWorker()
|
||||
|
@ -375,7 +375,7 @@ func methodREQCopyDst(proc process, message Message, node string) ([]byte, error
|
|||
return ackMsg, nil
|
||||
}
|
||||
|
||||
func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
|
||||
func copySrcSubHandler() func(process, Message, string) ([]byte, error) {
|
||||
h := func(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
// We should receive a ready message generated by the procFunc of Dst,
|
||||
|
@ -396,7 +396,7 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by
|
|||
return h
|
||||
}
|
||||
|
||||
func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
|
||||
func copyDstSubHandler() func(process, Message, string) ([]byte, error) {
|
||||
h := func(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
select {
|
||||
|
|
|
@ -68,7 +68,7 @@ func methodREQOpProcessStart(proc process, message Message, node string) ([]byte
|
|||
|
||||
// Create the process and start it.
|
||||
sub := newSubject(method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
|
||||
go procNew.spawnWorker()
|
||||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
|
|
|
@ -322,7 +322,7 @@ func TestRequest(t *testing.T) {
|
|||
case fileContains:
|
||||
resultFile := filepath.Join(tstConf.SubscribersDataFolder, tt.message.Directory, string(tt.message.FromNode), tt.message.FileName)
|
||||
|
||||
found, err := findStringInFileTest(string(tt.want), resultFile, tstConf, t)
|
||||
found, err := findStringInFileTest(string(tt.want), resultFile)
|
||||
if err != nil || found == false {
|
||||
t.Fatalf(" \U0001F631 [FAILED] : %v: %v\n", tt.info, err)
|
||||
|
||||
|
@ -334,14 +334,14 @@ func TestRequest(t *testing.T) {
|
|||
|
||||
// --- Other REQ tests that does not fit well into the general table above.
|
||||
|
||||
checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir)
|
||||
checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir)
|
||||
checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir)
|
||||
checkREQCopySrc(tstSrv, tstConf, t, tstTempDir)
|
||||
checkREQTailFileTest(tstConf, t, tstTempDir)
|
||||
checkMetricValuesTest(tstSrv, t)
|
||||
checkErrorKernelMalformedJSONtest(tstConf, t)
|
||||
checkREQCopySrc(tstConf, t, tstTempDir)
|
||||
}
|
||||
|
||||
// Check the tailing of files type.
|
||||
func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error {
|
||||
func checkREQTailFileTest(conf *Configuration, t *testing.T, tmpDir string) error {
|
||||
// Create a file with some content.
|
||||
fp := filepath.Join(tmpDir, "test.file")
|
||||
fh, err := os.OpenFile(fp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
|
||||
|
@ -407,7 +407,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T,
|
|||
|
||||
cancel()
|
||||
|
||||
_, err = findStringInFileTest("some file content", resultFile, conf, t)
|
||||
_, err = findStringInFileTest("some file content", resultFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: %v", err)
|
||||
}
|
||||
|
@ -417,7 +417,7 @@ func checkREQTailFileTest(ctrlServer *server, conf *Configuration, t *testing.T,
|
|||
}
|
||||
|
||||
// Check the file copier.
|
||||
func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpDir string) error {
|
||||
func checkREQCopySrc(conf *Configuration, t *testing.T, tmpDir string) error {
|
||||
testFiles := 5
|
||||
|
||||
for i := 1; i <= testFiles; i++ {
|
||||
|
@ -474,7 +474,7 @@ func checkREQCopySrc(ctrlServer *server, conf *Configuration, t *testing.T, tmpD
|
|||
return nil
|
||||
}
|
||||
|
||||
func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error {
|
||||
func checkMetricValuesTest(ctrlServer *server, t *testing.T) error {
|
||||
mfs, err := ctrlServer.metrics.promRegistry.Gather()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error: promRegistry.gathering: %v", mfs)
|
||||
|
@ -507,7 +507,7 @@ func checkMetricValuesTest(ctrlServer *server, conf *Configuration, t *testing.T
|
|||
}
|
||||
|
||||
// Check errorKernel
|
||||
func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration, t *testing.T, tempDir string) error {
|
||||
func checkErrorKernelMalformedJSONtest(conf *Configuration, t *testing.T) error {
|
||||
|
||||
// JSON message with error, missing brace.
|
||||
m := `[
|
||||
|
@ -550,7 +550,7 @@ func checkErrorKernelMalformedJSONtest(ctrlServer *server, conf *Configuration,
|
|||
case <-chUpdated:
|
||||
// We got an update, so we continue to check if we find the string we're
|
||||
// looking for.
|
||||
found, err := findStringInFileTest("error: malformed json", resultFile, conf, t)
|
||||
found, err := findStringInFileTest("error: malformed json", resultFile)
|
||||
if !found && err != nil {
|
||||
return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: %v", err)
|
||||
}
|
||||
|
@ -605,7 +605,7 @@ func checkFileUpdated(fileRealPath string, fileUpdated chan bool) {
|
|||
}
|
||||
|
||||
// Check if a file contains the given string.
|
||||
func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) {
|
||||
func findStringInFileTest(want string, fileName string) (bool, error) {
|
||||
// Wait n seconds for the results file to be created
|
||||
n := 50
|
||||
|
||||
|
|
12
server.go
12
server.go
|
@ -101,6 +101,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
case configuration.NkeySeed != "":
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to get current working directory when creating tmp seed file: %v", err)
|
||||
}
|
||||
|
||||
|
@ -113,6 +114,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
|
||||
err = os.WriteFile(pth, []byte(configuration.NkeySeed), 0700)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to write temp seed file: %v", err)
|
||||
}
|
||||
|
||||
|
@ -332,7 +334,7 @@ func (s *server) Start() {
|
|||
//
|
||||
// NB: The context of the initial process are set in processes.Start.
|
||||
sub := newSubject(REQInitial, s.nodeName)
|
||||
s.processInitial = newProcess(context.TODO(), s, sub, "", nil)
|
||||
s.processInitial = newProcess(context.TODO(), s, sub, "")
|
||||
// Start all wanted subscriber processes.
|
||||
s.processes.Start(s.processInitial)
|
||||
|
||||
|
@ -342,7 +344,7 @@ func (s *server) Start() {
|
|||
// Start exposing the the data folder via HTTP if flag is set.
|
||||
if s.configuration.ExposeDataFolder != "" {
|
||||
log.Printf("info: Starting expose of data folder via HTTP\n")
|
||||
go s.exposeDataFolder(s.ctx)
|
||||
go s.exposeDataFolder()
|
||||
}
|
||||
|
||||
// Start the processing of new messages from an input channel.
|
||||
|
@ -569,9 +571,9 @@ func (s *server) routeMessagesToProcess() {
|
|||
var proc process
|
||||
switch {
|
||||
case m.IsSubPublishedMsg:
|
||||
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
|
||||
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
|
||||
default:
|
||||
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
|
||||
proc = newProcess(s.ctx, s, sub, processKindPublisher)
|
||||
}
|
||||
|
||||
proc.spawnWorker()
|
||||
|
@ -595,7 +597,7 @@ func (s *server) routeMessagesToProcess() {
|
|||
}()
|
||||
}
|
||||
|
||||
func (s *server) exposeDataFolder(ctx context.Context) {
|
||||
func (s *server) exposeDataFolder() {
|
||||
fileHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
// w.Header().Set("Content-Type", "text/html")
|
||||
http.FileServer(http.Dir(s.configuration.SubscribersDataFolder)).ServeHTTP(w, r)
|
||||
|
|
Loading…
Reference in a new issue