diff --git a/go.mod b/go.mod index 209c108..cb67fef 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,9 @@ require ( github.com/google/uuid v1.3.0 github.com/hpcloud/tail v1.0.0 github.com/jinzhu/copier v0.3.5 - github.com/klauspost/compress v1.14.2 - github.com/nats-io/nats-server/v2 v2.6.2 - github.com/nats-io/nats.go v1.14.0 + github.com/klauspost/compress v1.15.6 + github.com/nats-io/nats-server/v2 v2.8.4 + github.com/nats-io/nats.go v1.15.0 github.com/pelletier/go-toml v1.8.1 github.com/prometheus/client_golang v1.11.0 github.com/rivo/tview v0.0.0-20220106183741-90d72bc664f5 @@ -32,7 +32,7 @@ require ( github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect + github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -40,11 +40,11 @@ require ( github.com/prometheus/procfs v0.6.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect - golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect + golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect + golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect + golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect google.golang.org/protobuf v1.26.0-rc.1 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/go.sum b/go.sum index aa1d81f..84dfb42 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY= +github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -103,11 +105,17 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.6.2 h1:uMydiSENbgRPsXHBYDvVVVx1d0inut/zd+DvISIGCi8= github.com/nats-io/nats-server/v2 v2.6.2/go.mod h1:CNi6dJQ5H+vWqaoWKjCGtqBt7ai/xOTLiocUqhK6ews= +github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= +github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.14.0 h1:/QLCss4vQ6wvDpbqXucsVRDi13tFIR6kTdau+nXzKJw= github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.15.0 h1:3IXNBolWrwIUf2soxh6Rla8gPzYWEZQBUBK6RV21s+o= +github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -168,6 +176,8 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 h1:tkVvjkPTB7pnW3jnid7kNyAMPVWllTNOf/qKDze4p9o= golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -202,6 +212,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU= +golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -216,6 +228,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/message_and_subject.go b/message_and_subject.go index 7e1a68c..ddc6803 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -49,6 +49,8 @@ type Message struct { FromNode Node `json:"fromNode" yaml:"fromNode"` // ACKTimeout for waiting for an ack message ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` + // RetryWait specified the time in seconds to wait between retries. + RetryWait int `json:"retryWait" yaml:"retryWait"` // Resend retries Retries int `json:"retries" yaml:"retries"` // The ACK timeout of the new message created via a request event. diff --git a/process.go b/process.go index d5462b6..3474f6a 100644 --- a/process.go +++ b/process.go @@ -299,18 +299,24 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // or exit if max retries for the message reached. _, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) if err != nil { - er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err) - // sendErrorLogMessage(p.toRingbufferCh, p.node, er) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) if err == nats.ErrNoResponders { // fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout) - time.Sleep(time.Second * time.Duration(message.ACKTimeout)) + // time.Sleep(time.Second * time.Duration(message.ACKTimeout)) + if message.RetryWait < 0 { + message.RetryWait = 0 + } + + er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) + // sendErrorLogMessage(p.toRingbufferCh, p.node, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + time.Sleep(time.Second * time.Duration(message.RetryWait)) } // did not receive a reply, decide what to do.. retryAttempts++ - er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) + er := fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) switch { diff --git a/requests.go b/requests.go index 09fef98..a67871f 100644 --- a/requests.go +++ b/requests.go @@ -461,6 +461,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { MethodArgs: message.ReplyMethodArgs, MethodTimeout: message.ReplyMethodTimeout, IsReply: true, + RetryWait: message.RetryWait, ACKTimeout: message.ReplyACKTimeout, Retries: message.ReplyRetries, Directory: message.Directory,