mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 02:47:43 +00:00
initial test of jetstream consmumer and publisher
This commit is contained in:
parent
fc1bdc9d44
commit
172dfd1b8c
4 changed files with 94 additions and 6 deletions
|
@ -12,12 +12,6 @@ import (
|
||||||
// received, check the MessageType field in the message to decide what
|
// received, check the MessageType field in the message to decide what
|
||||||
// kind of message it is and then it will check how to handle that message type,
|
// kind of message it is and then it will check how to handle that message type,
|
||||||
// and then call the correct method handler for it.
|
// and then call the correct method handler for it.
|
||||||
//
|
|
||||||
// This handler function should be started in it's own go routine,so
|
|
||||||
// one individual handler is started per message received so we can keep
|
|
||||||
// the state of the message being processed, and then reply back to the
|
|
||||||
// correct sending process's reply, meaning so we ACK back to the correct
|
|
||||||
// publisher.
|
|
||||||
func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstream.Msg, subject string) {
|
func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstream.Msg, subject string) {
|
||||||
|
|
||||||
// Variable to hold a copy of the message data.
|
// Variable to hold a copy of the message data.
|
||||||
|
|
73
processes.go
73
processes.go
|
@ -7,6 +7,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -337,6 +338,78 @@ func (p *processes) Start(proc process) {
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.startup.subscriber(proc, PublicKey, nil)
|
proc.startup.subscriber(proc, PublicKey, nil)
|
||||||
|
|
||||||
|
// ProcFunc for Jetstream publishers.
|
||||||
|
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
|
js, err := jetstream.New(proc.natsConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
|
||||||
|
Name: "orders",
|
||||||
|
Description: "orders stream",
|
||||||
|
Subjects: []string{"orders.>"},
|
||||||
|
// Discard older messages and keep only the last one.
|
||||||
|
MaxMsgsPerSubject: 1,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: jetstream create or update failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
// TODO:
|
||||||
|
_, err := js.Publish(proc.ctx, "orders.shop1", []byte(fmt.Sprintf("order nr. %v", i)))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: js failed to publish message: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("published message: %v\n", i)
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
|
||||||
|
|
||||||
|
// Procfunc for Jetstream consumers.
|
||||||
|
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
|
js, err := jetstream.New(proc.natsConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: jetstream new failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := js.Stream(proc.ctx, "orders")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: js.Stream failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
|
||||||
|
Name: "order_processor",
|
||||||
|
Durable: "order_processor",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cctx, err := consumer.Consume(func(msg jetstream.Msg) {
|
||||||
|
log.Printf("Received message: %v, with data: %v\n", string(msg.Subject()), string(msg.Data()))
|
||||||
|
msg.Ack()
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: create or update consumer failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer cctx.Stop()
|
||||||
|
|
||||||
|
<-proc.ctx.Done()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop all subscriber processes.
|
// Stop all subscriber processes.
|
||||||
|
|
15
request-jetstream.go
Normal file
15
request-jetstream.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package ctrl
|
||||||
|
|
||||||
|
// jetstreamsConsumers will start up the netstream consumers.
|
||||||
|
// The consumer logic are put in the procFunc.
|
||||||
|
func jetstreamsConsumers(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// jetstreamPublishers will start up the netstream publishers.
|
||||||
|
// The publisher logic are put in the procFunc.
|
||||||
|
func jetstreamPublishers(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
|
@ -159,6 +159,10 @@ const (
|
||||||
AclExport = "aclExport"
|
AclExport = "aclExport"
|
||||||
// REQAclImport
|
// REQAclImport
|
||||||
AclImport = "aclImport"
|
AclImport = "aclImport"
|
||||||
|
// Jetstreams Consumers
|
||||||
|
JetstreamConsumers = "jetstreamConsumers"
|
||||||
|
// JetstreamPublishers
|
||||||
|
JetStreamPublishers = "jetstreamPublishers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type HandlerFunc func(proc process, message Message, node string) ([]byte, error)
|
type HandlerFunc func(proc process, message Message, node string) ([]byte, error)
|
||||||
|
@ -212,6 +216,8 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
AclExport: HandlerFunc(methodAclExport),
|
AclExport: HandlerFunc(methodAclExport),
|
||||||
AclImport: HandlerFunc(methodAclImport),
|
AclImport: HandlerFunc(methodAclImport),
|
||||||
Test: HandlerFunc(methodTest),
|
Test: HandlerFunc(methodTest),
|
||||||
|
JetstreamConsumers: HandlerFunc(jetstreamsConsumers),
|
||||||
|
JetStreamPublishers: HandlerFunc(jetstreamPublishers),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue