From 233a727ac45a1b618963bb1e872bdf7041f15bff Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 20 Nov 2024 17:43:26 +0100 Subject: [PATCH] only using zstd compression, and cbor serialization --- configuration_flags.go | 8 -- go.mod | 12 +- go.sum | 12 ++ process.go | 251 +++++++---------------------------------- 4 files changed, 58 insertions(+), 225 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index 4898a30..2724a85 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -76,10 +76,6 @@ type Configuration struct { ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"` // Retries for error messages ErrorMessageRetries int `comment:"Retries for error messages"` - // Compression z for zstd or g for gzip - Compression string `comment:"Compression z for zstd or g for gzip"` - // Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor - Serialization string `comment:"Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor"` // SetBlockProfileRate for block profiling SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"` // EnableSocket for enabling the creation of a ctrl.sock file @@ -177,8 +173,6 @@ func NewConfiguration() *Configuration { flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it") - flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression") - flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file") flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.") @@ -254,8 +248,6 @@ func newConfigurationDefaults() Configuration { ExposeDataFolder: "", ErrorMessageTimeout: 60, ErrorMessageRetries: 10, - Compression: "z", - Serialization: "cbor", SetBlockProfileRate: 0, EnableSocket: true, EnableSignatureCheck: false, diff --git a/go.mod b/go.mod index a20fbeb..3e80aac 100644 --- a/go.mod +++ b/go.mod @@ -10,14 +10,14 @@ require ( github.com/hpcloud/tail v1.0.0 github.com/jinzhu/copier v0.4.0 github.com/joho/godotenv v1.5.1 - github.com/klauspost/compress v1.17.0 + github.com/klauspost/compress v1.17.11 github.com/nats-io/nats-server/v2 v2.8.4 - github.com/nats-io/nats.go v1.25.0 - github.com/nats-io/nkeys v0.4.4 + github.com/nats-io/nats.go v1.37.0 + github.com/nats-io/nkeys v0.4.7 github.com/pkg/profile v1.7.0 github.com/prometheus/client_golang v1.14.0 go.etcd.io/bbolt v1.3.7 - golang.org/x/crypto v0.7.0 + golang.org/x/crypto v0.29.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 gopkg.in/yaml.v3 v3.0.1 ) @@ -39,8 +39,8 @@ require ( github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect diff --git a/go.sum b/go.sum index 06329a6..6ccebbd 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,8 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -65,9 +67,13 @@ github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBri github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -102,6 +108,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -116,6 +124,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= @@ -124,6 +134,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/process.go b/process.go index 058ddb5..50c93dc 100644 --- a/process.go +++ b/process.go @@ -1,16 +1,11 @@ package ctrl import ( - "bytes" - "compress/gzip" "context" "crypto/ed25519" - "encoding/gob" "errors" "fmt" - "io" "os" - "sync" "time" "github.com/fxamacker/cbor/v2" @@ -447,102 +442,42 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // the state of the message being processed, and then reply back to the // correct sending process's reply, meaning so we ACK back to the correct // publisher. -func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) { +func (p process) messageSubscriberHandlerNats(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) { // Variable to hold a copy of the message data, so we don't mess with // the original data since the original is a pointer value. msgData := make([]byte, len(msg.Data)) copy(msgData, msg.Data) - // fmt.Printf(" * DEBUG: header value on subscriberHandler: %v\n", msg.Header) - // If debugging is enabled, print the source node name of the nats messages received. if val, ok := msg.Header["fromNode"]; ok { er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject) p.errorKernel.logDebug(er) } - // If compression is used, decompress it to get the gob data. If - // compression is not used it is the gob encoded data we already - // got in msgData so we do nothing with it. - if val, ok := msg.Header["cmp"]; ok { - switch val[0] { - case "z": - zr, err := zstd.NewReader(nil) - if err != nil { - er := fmt.Errorf("error: zstd NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - msgData, err = zr.DecodeAll(msg.Data, nil) - if err != nil { - er := fmt.Errorf("error: zstd decoding failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - zr.Close() - return - } - - zr.Close() - - case "g": - r := bytes.NewReader(msgData) - gr, err := gzip.NewReader(r) - if err != nil { - er := fmt.Errorf("error: gzip NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logError) - return - } - - b, err := io.ReadAll(gr) - if err != nil { - er := fmt.Errorf("error: gzip ReadAll failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - - gr.Close() - - msgData = b - } + zr, err := zstd.NewReader(nil) + if err != nil { + er := fmt.Errorf("error: zstd NewReader failed: %v", err) + p.errorKernel.errSend(p, Message{}, er, logWarning) + return } + msgData, err = zr.DecodeAll(msg.Data, nil) + if err != nil { + er := fmt.Errorf("error: zstd decoding failed: %v", err) + p.errorKernel.errSend(p, Message{}, er, logWarning) + zr.Close() + return + } + + zr.Close() message := Message{} - // Check if serialization is specified. - // Will default to gob serialization if nothing or non existing value is specified. - if val, ok := msg.Header["serial"]; ok { - // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) - switch val[0] { - case "cbor": - err := cbor.Unmarshal(msgData, &message) - if err != nil { - er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - default: // Deaults to gob if no match was found. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - } - - } else { - // Default to gob if serialization flag was not specified. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } + err = cbor.Unmarshal(msgData, &message) + if err != nil { + er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) + p.errorKernel.errSend(p, message, er, logError) + return } // Check if it is an ACK or NACK message, and do the appropriate action accordingly. @@ -782,7 +717,7 @@ func (p process) subscribeMessages() *nats.Subscription { //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // Start up the subscriber handler. - go p.messageSubscriberHandler(p.natsConn, p.configuration.NodeName, msg, subject) + go p.messageSubscriberHandlerNats(p.natsConn, p.configuration.NodeName, msg, subject) }) if err != nil { er := fmt.Errorf("error: Subscribe failed: %v", err) @@ -797,25 +732,18 @@ func (p process) subscribeMessages() *nats.Subscription { // process. The function should be run as a goroutine, and will run // as long as the process it belongs to is running. func (p process) publishMessages(natsConn *nats.Conn) { - var once sync.Once var zEnc *zstd.Encoder - // Prepare a zstd encoder if enabled. By enabling it here before - // looping over the messages to send below, we can reuse the zstd - // encoder for all messages. - switch p.configuration.Compression { - case "z": // zstd - // enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) - enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) - if err != nil { - er := fmt.Errorf("error: zstd new encoder failed: %v", err) - p.errorKernel.logError(er) - os.Exit(1) - } - zEnc = enc - defer zEnc.Close() + // Prepare a zstd encoder so we can reuse the zstd encoder for all messages. + enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + if err != nil { + er := fmt.Errorf("error: zstd new encoder failed: %v", err) + p.errorKernel.logError(er) + os.Exit(1) } + zEnc = enc + defer zEnc.Close() // Adding a timer that will be used for when to remove the sub process // publisher. The timer is reset each time a message is published with @@ -859,7 +787,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { m.ArgSignature = p.addMethodArgSignature(m) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - go p.publishAMessage(m, zEnc, &once, natsConn) + go p.publishAMessage(m, zEnc, natsConn) case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) @@ -876,7 +804,7 @@ func (p process) addMethodArgSignature(m Message) []byte { return sign } -func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) { +func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) { // Create the initial header, and set values below depending on the // various configuration options chosen. natsMsgHeader := make(nats.Header) @@ -885,94 +813,25 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, // The serialized value of the nats message payload var natsMsgPayloadSerialized []byte - // encode the message structure into gob binary format before putting - // it into a nats message. - // Prepare a gob encoder with a buffer before we start the loop - switch p.configuration.Serialization { - case "cbor": - b, err := cbor.Marshal(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = b - natsMsgHeader["serial"] = []string{p.configuration.Serialization} - - default: - var bufGob bytes.Buffer - gobEnc := gob.NewEncoder(&bufGob) - err := gobEnc.Encode(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = bufGob.Bytes() - natsMsgHeader["serial"] = []string{"gob"} + // encode the message structure into cbor + b, err := cbor.Marshal(m) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) + p.errorKernel.logDebug(er) + return } + natsMsgPayloadSerialized = b + // Get the process name so we can look up the process in the // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) - // The compressed value of the nats message payload. The content - // can either be compressed or in it's original form depening on - // the outcome of the switch below, and if compression were chosen - // or not. - var natsMsgPayloadCompressed []byte - // Compress the data payload if selected with configuration flag. // The compression chosen is later set in the nats msg header when // calling p.messageDeliverNats below. - switch p.configuration.Compression { - case "z": // zstd - natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - // p.zEncMutex.Lock() - // zEnc.Reset(nil) - // p.zEncMutex.Unlock() - - case "g": // gzip - var buf bytes.Buffer - func() { - gzipW := gzip.NewWriter(&buf) - defer gzipW.Close() - defer gzipW.Flush() - _, err := gzipW.Write(natsMsgPayloadSerialized) - if err != nil { - er := fmt.Errorf("error: failed to write gzip: %v", err) - p.errorKernel.logDebug(er) - return - } - - }() - - natsMsgPayloadCompressed = buf.Bytes() - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - case "": // no compression - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} - - default: // no compression - // Allways log the error to console. - er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression") - p.errorKernel.logDebug(er) - - // We only wan't to send the error message to errorCentral once. - once.Do(func() { - p.errorKernel.logDebug(er) - }) - - // No compression, so we just assign the value of the serialized - // data directly to the variable used with messageDeliverNats. - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} - } + natsMsgPayloadCompressed := zEnc.EncodeAll(natsMsgPayloadSerialized, nil) // Create the Nats message with headers and payload, and do the // sending of the message. @@ -986,34 +845,4 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, p.processes.active.procNames[pn] = p p.processes.active.mu.Unlock() } - - // // Handle the error. - // // - // // NOTE: None of the processes above generate an error, so the the - // // if clause will never be triggered. But keeping it here as an example - // // for now for how to handle errors. - // if err != nil { - // // Create an error type which also creates a channel which the - // // errorKernel will send back the action about what to do. - // ep := errorEvent{ - // //errorType: logOnly, - // process: p, - // message: m, - // errorActionCh: make(chan errorAction), - // } - // p.errorCh <- ep - // - // // Wait for the response action back from the error kernel, and - // // decide what to do. Should we continue, quit, or .... ? - // switch <-ep.errorActionCh { - // case errActionContinue: - // // Just log and continue - // log.Printf("The errAction was continue...so we're continuing\n") - // case errActionKill: - // log.Printf("The errAction was kill...so we're killing\n") - // // .... - // default: - // log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n") - // } - // } }