diff --git a/publish.go b/publish.go index 33222cf..9381986 100644 --- a/publish.go +++ b/publish.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "time" "github.com/caddyserver/caddy/v2" @@ -60,11 +61,18 @@ func (p Publish) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhtt if err != nil { return err } + defer r.Body.Close() p.logger.Debug("publishing NATS message", zap.String("subject", subj), zap.Bool("with_reply", p.WithReply), zap.Int64("timeout", p.Timeout)) + //map only last header + headers := map[string]string{} + for k, v := range r.Header { + headers[k] = v[0] + } + if p.WithReply { - return p.natsRequestReply(subj, data, w) + return p.natsRequestReply(subj, data, w, headers) } // Otherwise. just publish like normal @@ -76,8 +84,15 @@ func (p Publish) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhtt return next.ServeHTTP(w, r) } -func (p Publish) natsRequestReply(subject string, reqBody []byte, w http.ResponseWriter) error { - m, err := p.app.conn.Request(subject, reqBody, time.Duration(p.Timeout)*time.Millisecond) +func (p Publish) natsRequestReply(subject string, reqBody []byte, w http.ResponseWriter, headers map[string]string) error { + msg := nats.NewMsg(subject) + + for k, v := range headers { + msg.Header.Set(k, v) + } + msg.Data = reqBody + + m, err := p.app.conn.RequestMsg(msg, time.Duration(p.Timeout)*time.Millisecond) // TODO: Make error handlers configurable if err == nats.ErrNoResponders { @@ -88,6 +103,16 @@ func (p Publish) natsRequestReply(subject string, reqBody []byte, w http.Respons return err } + // handle nats micro response status + code := m.Header.Get("Nats-Service-Error-Code") + if code != "" && code != "200" { + status, err := strconv.Atoi(code) + if err != nil { + return err + } + w.WriteHeader(status) + } + _, err = w.Write(m.Data) return err