1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

del console output, add flag for block profiling

This commit is contained in:
postmannen 2021-12-31 06:59:09 +01:00
parent 31ce2e978c
commit 0d0cd8c0b6
4 changed files with 18 additions and 3 deletions

View file

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"runtime"
"time" "time"
_ "net/http/pprof" _ "net/http/pprof"
@ -38,6 +39,10 @@ func main() {
} }
if c.SetBlockProfileRate != 0 {
runtime.SetBlockProfileRate(1)
}
s, err := steward.NewServer(c, version) s, err := steward.NewServer(c, version)
if err != nil { if err != nil {
log.Printf("%v\n", err) log.Printf("%v\n", err)

View file

@ -69,6 +69,8 @@ type Configuration struct {
Compression string Compression string
// Serialization // Serialization
Serialization string Serialization string
// SetBlockProfileRate for block profiling
SetBlockProfileRate int
// NOTE: // NOTE:
// Op commands will not be specified as a flag since they can't be turned off. // Op commands will not be specified as a flag since they can't be turned off.
@ -137,6 +139,7 @@ type ConfigurationFromFile struct {
ErrorMessageRetries *int ErrorMessageRetries *int
Compression *string Compression *string
Serialization *string Serialization *string
SetBlockProfileRate *int
StartPubREQHello *int StartPubREQHello *int
StartSubREQErrorLog *bool StartSubREQErrorLog *bool
@ -190,6 +193,7 @@ func newConfigurationDefaults() Configuration {
ErrorMessageRetries: 10, ErrorMessageRetries: 10,
Compression: "", Compression: "",
Serialization: "", Serialization: "",
SetBlockProfileRate: 0,
StartSubREQErrorLog: true, StartSubREQErrorLog: true,
StartSubREQHello: true, StartSubREQHello: true,
@ -345,6 +349,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else { } else {
conf.Serialization = *cf.Serialization conf.Serialization = *cf.Serialization
} }
if cf.SetBlockProfileRate == nil {
conf.SetBlockProfileRate = cd.SetBlockProfileRate
} else {
conf.SetBlockProfileRate = *cf.SetBlockProfileRate
}
if cf.StartPubREQHello == nil { if cf.StartPubREQHello == nil {
conf.StartPubREQHello = cd.StartPubREQHello conf.StartPubREQHello = cd.StartPubREQHello
@ -479,6 +488,7 @@ func (c *Configuration) CheckFlags() error {
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it")
flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd. Undefined value will default to no compression") flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd. Undefined value will default to no compression")
flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", fc.SetBlockProfileRate, "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds") flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")

View file

@ -258,7 +258,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// Wait up until ACKTimeout specified for a reply, // Wait up until ACKTimeout specified for a reply,
// continue and resend if no reply received, // continue and resend if no reply received,
// or exit if max retries for the message reached. // or exit if max retries for the message reached.
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) _, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
if err != nil { if err != nil {
er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err) er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err)
// sendErrorLogMessage(p.toRingbufferCh, p.node, er) // sendErrorLogMessage(p.toRingbufferCh, p.node, er)
@ -296,7 +296,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
continue continue
} }
} }
log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data) // REMOVED: log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data)
} }
subReply.Unsubscribe() subReply.Unsubscribe()

View file

@ -281,7 +281,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// been processed, and that we then can delete it out of the K/V Store. // been processed, and that we then can delete it out of the K/V Store.
<-v.Data.done <-v.Data.done
log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) // log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID)) r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))
// Since we are now done with the specific message we can delete // Since we are now done with the specific message we can delete