mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
added message.RetryWait field, upgrader nats-server package
This commit is contained in:
parent
b8a2d3d5e7
commit
7e9a7638aa
5 changed files with 35 additions and 12 deletions
14
go.mod
14
go.mod
|
@ -10,9 +10,9 @@ require (
|
|||
github.com/google/uuid v1.3.0
|
||||
github.com/hpcloud/tail v1.0.0
|
||||
github.com/jinzhu/copier v0.3.5
|
||||
github.com/klauspost/compress v1.14.2
|
||||
github.com/nats-io/nats-server/v2 v2.6.2
|
||||
github.com/nats-io/nats.go v1.14.0
|
||||
github.com/klauspost/compress v1.15.6
|
||||
github.com/nats-io/nats-server/v2 v2.8.4
|
||||
github.com/nats-io/nats.go v1.15.0
|
||||
github.com/pelletier/go-toml v1.8.1
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/rivo/tview v0.0.0-20220106183741-90d72bc664f5
|
||||
|
@ -32,7 +32,7 @@ require (
|
|||
github.com/mattn/go-runewidth v0.0.13 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/minio/highwayhash v1.0.2 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
|
@ -40,11 +40,11 @@ require (
|
|||
github.com/prometheus/procfs v0.6.0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect
|
||||
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
|
||||
google.golang.org/protobuf v1.26.0-rc.1 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
|
|
14
go.sum
14
go.sum
|
@ -72,6 +72,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
|
|||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
|
||||
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY=
|
||||
github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
|
@ -103,11 +105,17 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
|
|||
github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.6.2 h1:uMydiSENbgRPsXHBYDvVVVx1d0inut/zd+DvISIGCi8=
|
||||
github.com/nats-io/nats-server/v2 v2.6.2/go.mod h1:CNi6dJQ5H+vWqaoWKjCGtqBt7ai/xOTLiocUqhK6ews=
|
||||
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
|
||||
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
|
||||
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.14.0 h1:/QLCss4vQ6wvDpbqXucsVRDi13tFIR6kTdau+nXzKJw=
|
||||
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.15.0 h1:3IXNBolWrwIUf2soxh6Rla8gPzYWEZQBUBK6RV21s+o=
|
||||
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
@ -168,6 +176,8 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y
|
|||
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 h1:tkVvjkPTB7pnW3jnid7kNyAMPVWllTNOf/qKDze4p9o=
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
|
@ -202,6 +212,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
|
||||
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
|
@ -216,6 +228,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
|||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
|
@ -49,6 +49,8 @@ type Message struct {
|
|||
FromNode Node `json:"fromNode" yaml:"fromNode"`
|
||||
// ACKTimeout for waiting for an ack message
|
||||
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
||||
// RetryWait specified the time in seconds to wait between retries.
|
||||
RetryWait int `json:"retryWait" yaml:"retryWait"`
|
||||
// Resend retries
|
||||
Retries int `json:"retries" yaml:"retries"`
|
||||
// The ACK timeout of the new message created via a request event.
|
||||
|
|
16
process.go
16
process.go
|
@ -299,18 +299,24 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
// or exit if max retries for the message reached.
|
||||
_, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, p.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
|
||||
if err == nats.ErrNoResponders {
|
||||
// fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout)
|
||||
time.Sleep(time.Second * time.Duration(message.ACKTimeout))
|
||||
// time.Sleep(time.Second * time.Duration(message.ACKTimeout))
|
||||
if message.RetryWait < 0 {
|
||||
message.RetryWait = 0
|
||||
}
|
||||
|
||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, p.node, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
|
||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||
}
|
||||
|
||||
// did not receive a reply, decide what to do..
|
||||
retryAttempts++
|
||||
er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
|
||||
er := fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
|
||||
switch {
|
||||
|
|
|
@ -461,6 +461,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
|||
MethodArgs: message.ReplyMethodArgs,
|
||||
MethodTimeout: message.ReplyMethodTimeout,
|
||||
IsReply: true,
|
||||
RetryWait: message.RetryWait,
|
||||
ACKTimeout: message.ReplyACKTimeout,
|
||||
Retries: message.ReplyRetries,
|
||||
Directory: message.Directory,
|
||||
|
|
Loading…
Add table
Reference in a new issue