2021-10-23 01:26:01 +00:00
package server
import (
"bytes"
2021-10-29 03:50:38 +00:00
"context"
2021-12-23 23:03:04 +00:00
"embed"
2021-10-23 01:26:01 +00:00
"encoding/json"
2021-10-23 17:21:33 +00:00
"fmt"
2021-11-08 14:24:34 +00:00
"html/template"
2021-10-23 01:26:01 +00:00
"io"
"log"
2021-10-24 02:49:50 +00:00
"net"
2021-10-23 01:26:01 +00:00
"net/http"
"regexp"
2021-10-29 17:58:14 +00:00
"strconv"
2021-10-23 01:26:01 +00:00
"strings"
"sync"
"time"
2021-12-25 16:26:18 +00:00
firebase "firebase.google.com/go"
"firebase.google.com/go/messaging"
"google.golang.org/api/option"
"heckel.io/ntfy/util"
2021-10-23 01:26:01 +00:00
)
2021-10-29 17:58:14 +00:00
// TODO add "max messages in a topic" limit
2021-11-01 20:39:40 +00:00
// TODO implement "since=<ID>"
2021-10-29 17:58:14 +00:00
2021-12-07 16:45:15 +00:00
// Server is the main server, providing the UI and API for ntfy
2021-10-23 01:26:01 +00:00
type Server struct {
2021-12-22 13:17:50 +00:00
config * Config
httpServer * http . Server
httpsServer * http . Server
topics map [ string ] * topic
visitors map [ string ] * visitor
firebase subscriber
2021-12-23 23:03:04 +00:00
mailer mailer
2021-12-22 13:17:50 +00:00
messages int64
cache cache
closeChan chan bool
mu sync . Mutex
2021-10-23 01:26:01 +00:00
}
2021-10-24 02:49:50 +00:00
// errHTTP is a generic HTTP error for any non-200 HTTP error
type errHTTP struct {
2021-12-25 14:15:05 +00:00
Code int ` json:"code,omitempty" `
2021-12-25 14:21:41 +00:00
HTTPCode int ` json:"http" `
2021-12-25 14:15:05 +00:00
Message string ` json:"error" `
Link string ` json:"link,omitempty" `
2021-10-24 02:49:50 +00:00
}
func ( e errHTTP ) Error ( ) string {
2021-12-25 14:15:05 +00:00
return e . Message
}
func ( e errHTTP ) JSON ( ) string {
b , _ := json . Marshal ( & e )
return string ( b )
2021-10-23 01:26:01 +00:00
}
2021-11-08 14:24:34 +00:00
type indexPage struct {
Topic string
2021-12-09 15:23:17 +00:00
CacheDuration time . Duration
2021-11-08 14:24:34 +00:00
}
2021-11-08 14:46:31 +00:00
type sinceTime time . Time
func ( t sinceTime ) IsAll ( ) bool {
return t == sinceAllMessages
}
func ( t sinceTime ) IsNone ( ) bool {
return t == sinceNoMessages
}
func ( t sinceTime ) Time ( ) time . Time {
return time . Time ( t )
}
var (
sinceAllMessages = sinceTime ( time . Unix ( 0 , 0 ) )
sinceNoMessages = sinceTime ( time . Unix ( 1 , 0 ) )
)
2021-10-23 01:26:01 +00:00
var (
2021-11-08 14:24:34 +00:00
topicRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}$ ` ) // Regex must match JS & Android app!
2021-11-15 12:56:58 +00:00
jsonRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/json$ ` )
sseRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/sse$ ` )
rawRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/raw$ ` )
2021-12-15 21:12:40 +00:00
sendRegex = regexp . MustCompile ( ` ^/[-_A-Za-z0-9] { 1,64}(,[-_A-Za-z0-9] { 1,64})*/(publish|send|trigger)$ ` )
2021-10-29 17:58:14 +00:00
2021-12-09 03:13:59 +00:00
staticRegex = regexp . MustCompile ( ` ^/static/.+ ` )
docsRegex = regexp . MustCompile ( ` ^/docs(|/.*)$ ` )
disallowedTopics = [ ] string { "docs" , "static" }
2021-10-23 01:26:01 +00:00
2021-12-09 15:23:17 +00:00
templateFnMap = template . FuncMap {
"durationToHuman" : util . DurationToHuman ,
}
2021-11-08 14:24:34 +00:00
//go:embed "index.gohtml"
indexSource string
2021-12-09 15:23:17 +00:00
indexTemplate = template . Must ( template . New ( "index" ) . Funcs ( templateFnMap ) . Parse ( indexSource ) )
2021-10-24 01:29:45 +00:00
2021-11-18 14:22:33 +00:00
//go:embed "example.html"
2021-11-27 21:12:08 +00:00
exampleSource string
2021-11-18 14:22:33 +00:00
2021-10-24 18:22:53 +00:00
//go:embed static
2021-11-29 14:34:43 +00:00
webStaticFs embed . FS
webStaticFsCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : webStaticFs }
2021-10-24 18:22:53 +00:00
2021-12-02 22:27:31 +00:00
//go:embed docs
2021-12-07 15:38:58 +00:00
docsStaticFs embed . FS
2021-12-02 22:27:31 +00:00
docsStaticCached = & util . CachingEmbedFS { ModTime : time . Now ( ) , FS : docsStaticFs }
2021-12-25 14:21:41 +00:00
errHTTPNotFound = & errHTTP { 40401 , http . StatusNotFound , "page not found" , "" }
2021-12-25 14:15:05 +00:00
errHTTPTooManyRequestsLimitRequests = & errHTTP { 42901 , http . StatusTooManyRequests , "limit reached: too many requests, please be nice" , "https://ntfy.sh/docs/publish/#limitations" }
errHTTPTooManyRequestsLimitEmails = & errHTTP { 42902 , http . StatusTooManyRequests , "limit reached: too many emails, please be nice" , "https://ntfy.sh/docs/publish/#limitations" }
errHTTPTooManyRequestsLimitSubscriptions = & errHTTP { 42903 , http . StatusTooManyRequests , "limit reached: too many active subscriptions, please be nice" , "https://ntfy.sh/docs/publish/#limitations" }
errHTTPTooManyRequestsLimitGlobalTopics = & errHTTP { 42904 , http . StatusTooManyRequests , "limit reached: the total number of topics on the server has been reached, please contact the admin" , "https://ntfy.sh/docs/publish/#limitations" }
errHTTPBadRequestEmailDisabled = & errHTTP { 40001 , http . StatusBadRequest , "e-mail notifications are not enabled" , "https://ntfy.sh/docs/config/#e-mail-notifications" }
errHTTPBadRequestDelayNoCache = & errHTTP { 40002 , http . StatusBadRequest , "cannot disable cache for delayed message" , "" }
errHTTPBadRequestDelayNoEmail = & errHTTP { 40003 , http . StatusBadRequest , "delayed e-mail notifications are not supported" , "" }
errHTTPBadRequestDelayCannotParse = & errHTTP { 40004 , http . StatusBadRequest , "invalid delay parameter: unable to parse delay" , "https://ntfy.sh/docs/publish/#scheduled-delivery" }
errHTTPBadRequestDelayTooSmall = & errHTTP { 40005 , http . StatusBadRequest , "invalid delay parameter: too small, please refer to the docs" , "https://ntfy.sh/docs/publish/#scheduled-delivery" }
errHTTPBadRequestDelayTooLarge = & errHTTP { 40006 , http . StatusBadRequest , "invalid delay parameter: too large, please refer to the docs" , "https://ntfy.sh/docs/publish/#scheduled-delivery" }
errHTTPBadRequestPriorityInvalid = & errHTTP { 40007 , http . StatusBadRequest , "invalid priority parameter" , "https://ntfy.sh/docs/publish/#message-priority" }
errHTTPBadRequestSinceInvalid = & errHTTP { 40008 , http . StatusBadRequest , "invalid since parameter" , "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages" }
errHTTPBadRequestTopicInvalid = & errHTTP { 40009 , http . StatusBadRequest , "invalid topic: path invalid" , "" }
errHTTPBadRequestTopicDisallowed = & errHTTP { 40010 , http . StatusBadRequest , "invalid topic: topic name is disallowed" , "" }
errHTTPInternalError = & errHTTP { 50001 , http . StatusInternalServerError , "internal server error" , "" }
2021-10-23 01:26:01 +00:00
)
2021-12-14 03:30:28 +00:00
const (
firebaseControlTopic = "~control" // See Android if changed
2021-12-23 23:03:04 +00:00
emptyMessageBody = "triggered"
2021-12-14 03:30:28 +00:00
)
2021-12-07 16:45:15 +00:00
// New instantiates a new Server. It creates the cache and adds a Firebase
// subscriber (if configured).
2021-12-19 03:02:36 +00:00
func New ( conf * Config ) ( * Server , error ) {
2021-10-29 17:58:14 +00:00
var firebaseSubscriber subscriber
2021-10-29 03:50:38 +00:00
if conf . FirebaseKeyFile != "" {
2021-10-29 17:58:14 +00:00
var err error
firebaseSubscriber , err = createFirebaseSubscriber ( conf )
2021-10-29 03:50:38 +00:00
if err != nil {
return nil , err
}
}
2021-12-23 23:03:04 +00:00
var mailer mailer
if conf . SMTPAddr != "" {
mailer = & smtpMailer { config : conf }
}
2021-11-03 01:09:49 +00:00
cache , err := createCache ( conf )
2021-11-02 18:08:21 +00:00
if err != nil {
return nil , err
}
2021-11-03 01:09:49 +00:00
topics , err := cache . Topics ( )
if err != nil {
return nil , err
2021-11-02 18:08:21 +00:00
}
2021-10-23 01:26:01 +00:00
return & Server {
2021-10-24 02:49:50 +00:00
config : conf ,
2021-11-03 01:09:49 +00:00
cache : cache ,
2021-10-29 17:58:14 +00:00
firebase : firebaseSubscriber ,
2021-12-23 23:03:04 +00:00
mailer : mailer ,
2021-11-02 18:08:21 +00:00
topics : topics ,
2021-10-24 02:49:50 +00:00
visitors : make ( map [ string ] * visitor ) ,
2021-10-29 03:50:38 +00:00
} , nil
2021-10-23 01:26:01 +00:00
}
2021-12-19 03:02:36 +00:00
func createCache ( conf * Config ) ( cache , error ) {
2021-12-09 15:23:17 +00:00
if conf . CacheDuration == 0 {
return newNopCache ( ) , nil
} else if conf . CacheFile != "" {
2021-11-03 01:09:49 +00:00
return newSqliteCache ( conf . CacheFile )
2021-11-02 18:08:21 +00:00
}
2021-11-03 01:09:49 +00:00
return newMemCache ( ) , nil
2021-11-02 18:08:21 +00:00
}
2021-12-19 03:02:36 +00:00
func createFirebaseSubscriber ( conf * Config ) ( subscriber , error ) {
2021-10-29 17:58:14 +00:00
fb , err := firebase . NewApp ( context . Background ( ) , nil , option . WithCredentialsFile ( conf . FirebaseKeyFile ) )
if err != nil {
return nil , err
}
msg , err := fb . Messaging ( context . Background ( ) )
if err != nil {
return nil , err
}
return func ( m * message ) error {
2021-12-14 03:30:28 +00:00
var data map [ string ] string // Matches https://ntfy.sh/docs/subscribe/api/#json-message-format
switch m . Event {
case keepaliveEvent , openEvent :
data = map [ string ] string {
"id" : m . ID ,
"time" : fmt . Sprintf ( "%d" , m . Time ) ,
"event" : m . Event ,
"topic" : m . Topic ,
}
case messageEvent :
data = map [ string ] string {
2021-11-27 21:12:08 +00:00
"id" : m . ID ,
"time" : fmt . Sprintf ( "%d" , m . Time ) ,
"event" : m . Event ,
"topic" : m . Topic ,
"priority" : fmt . Sprintf ( "%d" , m . Priority ) ,
"tags" : strings . Join ( m . Tags , "," ) ,
"title" : m . Title ,
"message" : m . Message ,
2021-12-14 03:30:28 +00:00
}
}
_ , err := msg . Send ( context . Background ( ) , & messaging . Message {
Topic : m . Topic ,
Data : data ,
2021-10-29 17:58:14 +00:00
} )
return err
} , nil
}
2021-12-07 16:45:15 +00:00
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
// a manager go routine to print stats and prune messages.
2021-10-23 01:26:01 +00:00
func ( s * Server ) Run ( ) error {
2021-12-02 13:52:48 +00:00
listenStr := fmt . Sprintf ( "%s/http" , s . config . ListenHTTP )
if s . config . ListenHTTPS != "" {
listenStr += fmt . Sprintf ( " %s/https" , s . config . ListenHTTPS )
}
log . Printf ( "Listening on %s" , listenStr )
2021-12-22 22:45:19 +00:00
mux := http . NewServeMux ( )
mux . HandleFunc ( "/" , s . handle )
2021-12-02 13:52:48 +00:00
errChan := make ( chan error )
2021-12-22 13:17:50 +00:00
s . mu . Lock ( )
s . closeChan = make ( chan bool )
2021-12-22 22:45:19 +00:00
s . httpServer = & http . Server { Addr : s . config . ListenHTTP , Handler : mux }
2021-12-02 13:52:48 +00:00
go func ( ) {
2021-12-22 13:17:50 +00:00
errChan <- s . httpServer . ListenAndServe ( )
2021-12-02 13:52:48 +00:00
} ( )
if s . config . ListenHTTPS != "" {
2021-12-22 22:45:19 +00:00
s . httpsServer = & http . Server { Addr : s . config . ListenHTTP , Handler : mux }
2021-12-02 13:52:48 +00:00
go func ( ) {
2021-12-22 13:17:50 +00:00
errChan <- s . httpsServer . ListenAndServeTLS ( s . config . CertFile , s . config . KeyFile )
2021-12-02 13:52:48 +00:00
} ( )
}
2021-12-22 13:17:50 +00:00
s . mu . Unlock ( )
2021-12-22 22:20:43 +00:00
go s . runManager ( )
go s . runAtSender ( )
go s . runFirebaseKeepliver ( )
2021-12-02 13:52:48 +00:00
return <- errChan
2021-10-23 01:26:01 +00:00
}
2021-12-22 13:17:50 +00:00
// Stop stops HTTP (+HTTPS) server and all managers
func ( s * Server ) Stop ( ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . httpServer != nil {
s . httpServer . Close ( )
}
if s . httpsServer != nil {
s . httpsServer . Close ( )
}
close ( s . closeChan )
}
2021-10-23 01:26:01 +00:00
func ( s * Server ) handle ( w http . ResponseWriter , r * http . Request ) {
if err := s . handleInternal ( w , r ) ; err != nil {
2021-12-25 14:15:05 +00:00
var e * errHTTP
var ok bool
if e , ok = err . ( * errHTTP ) ; ! ok {
e = errHTTPInternalError
2021-10-24 02:49:50 +00:00
}
2021-12-25 14:15:05 +00:00
log . Printf ( "[%s] %s - %d - %s" , r . RemoteAddr , r . Method , e . HTTPCode , err . Error ( ) )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
w . WriteHeader ( e . HTTPCode )
io . WriteString ( w , e . JSON ( ) + "\n" )
2021-10-23 01:26:01 +00:00
}
}
func ( s * Server ) handleInternal ( w http . ResponseWriter , r * http . Request ) error {
2021-12-02 22:27:31 +00:00
if r . Method == http . MethodGet && r . URL . Path == "/" {
2021-10-23 01:26:01 +00:00
return s . handleHome ( w , r )
2021-11-18 14:22:33 +00:00
} else if r . Method == http . MethodGet && r . URL . Path == "/example.html" {
return s . handleExample ( w , r )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodHead && r . URL . Path == "/" {
return s . handleEmpty ( w , r )
2021-10-24 18:22:53 +00:00
} else if r . Method == http . MethodGet && staticRegex . MatchString ( r . URL . Path ) {
return s . handleStatic ( w , r )
2021-12-02 22:27:31 +00:00
} else if r . Method == http . MethodGet && docsRegex . MatchString ( r . URL . Path ) {
return s . handleDocs ( w , r )
2021-11-05 17:46:27 +00:00
} else if r . Method == http . MethodOptions {
return s . handleOptions ( w , r )
2021-12-02 22:27:31 +00:00
} else if r . Method == http . MethodGet && topicRegex . MatchString ( r . URL . Path ) {
return s . handleHome ( w , r )
2021-10-29 17:58:14 +00:00
} else if ( r . Method == http . MethodPut || r . Method == http . MethodPost ) && topicRegex . MatchString ( r . URL . Path ) {
2021-11-05 17:46:27 +00:00
return s . withRateLimit ( w , r , s . handlePublish )
2021-12-15 14:41:55 +00:00
} else if r . Method == http . MethodGet && sendRegex . MatchString ( r . URL . Path ) {
return s . withRateLimit ( w , r , s . handlePublish )
2021-10-23 01:26:01 +00:00
} else if r . Method == http . MethodGet && jsonRegex . MatchString ( r . URL . Path ) {
2021-11-05 17:46:27 +00:00
return s . withRateLimit ( w , r , s . handleSubscribeJSON )
2021-10-23 17:21:33 +00:00
} else if r . Method == http . MethodGet && sseRegex . MatchString ( r . URL . Path ) {
2021-11-05 17:46:27 +00:00
return s . withRateLimit ( w , r , s . handleSubscribeSSE )
2021-10-24 01:29:45 +00:00
} else if r . Method == http . MethodGet && rawRegex . MatchString ( r . URL . Path ) {
2021-11-05 17:46:27 +00:00
return s . withRateLimit ( w , r , s . handleSubscribeRaw )
2021-10-23 01:26:01 +00:00
}
2021-10-24 02:49:50 +00:00
return errHTTPNotFound
2021-10-23 01:26:01 +00:00
}
func ( s * Server ) handleHome ( w http . ResponseWriter , r * http . Request ) error {
2021-11-08 14:24:34 +00:00
return indexTemplate . Execute ( w , & indexPage {
Topic : r . URL . Path [ 1 : ] ,
2021-12-09 15:23:17 +00:00
CacheDuration : s . config . CacheDuration ,
2021-11-08 14:24:34 +00:00
} )
2021-10-23 01:26:01 +00:00
}
2021-12-07 16:45:15 +00:00
func ( s * Server ) handleEmpty ( _ http . ResponseWriter , _ * http . Request ) error {
2021-11-05 17:46:27 +00:00
return nil
}
2021-12-07 16:45:15 +00:00
func ( s * Server ) handleExample ( w http . ResponseWriter , _ * http . Request ) error {
2021-11-18 14:22:33 +00:00
_ , err := io . WriteString ( w , exampleSource )
return err
}
2021-10-29 17:58:14 +00:00
func ( s * Server ) handleStatic ( w http . ResponseWriter , r * http . Request ) error {
2021-11-29 14:34:43 +00:00
http . FileServer ( http . FS ( webStaticFsCached ) ) . ServeHTTP ( w , r )
2021-10-29 17:58:14 +00:00
return nil
}
2021-12-02 22:27:31 +00:00
func ( s * Server ) handleDocs ( w http . ResponseWriter , r * http . Request ) error {
http . FileServer ( http . FS ( docsStaticCached ) ) . ServeHTTP ( w , r )
return nil
}
2021-12-23 23:03:04 +00:00
func ( s * Server ) handlePublish ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-12-15 14:41:55 +00:00
t , err := s . topicFromPath ( r . URL . Path )
2021-11-01 20:39:40 +00:00
if err != nil {
return err
}
2021-12-11 03:57:01 +00:00
reader := io . LimitReader ( r . Body , int64 ( s . config . MessageLimit ) )
2021-10-23 01:26:01 +00:00
b , err := io . ReadAll ( reader )
if err != nil {
return err
}
2021-12-15 14:41:55 +00:00
m := newDefaultMessage ( t . ID , strings . TrimSpace ( string ( b ) ) )
2021-12-23 20:04:17 +00:00
cache , firebase , email , err := s . parseParams ( r , m )
2021-12-10 16:31:42 +00:00
if err != nil {
2021-10-29 03:50:38 +00:00
return err
}
2021-12-25 16:26:18 +00:00
if r . Method == http . MethodGet && unifiedpush {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
_ , err := io . WriteString ( w , ` { "unifiedpush": { "version":1}} ` )
return err
}
2021-12-23 23:03:04 +00:00
if email != "" {
if err := v . EmailAllowed ( ) ; err != nil {
2021-12-25 14:15:05 +00:00
return errHTTPTooManyRequestsLimitEmails
2021-12-23 23:03:04 +00:00
}
}
2021-12-25 16:26:18 +00:00
m . UnifiedPush = unifiedpush
2021-12-23 23:03:04 +00:00
if s . mailer == nil && email != "" {
2021-12-25 14:15:05 +00:00
return errHTTPBadRequestEmailDisabled
2021-12-23 23:03:04 +00:00
}
2021-12-15 14:41:55 +00:00
if m . Message == "" {
2021-12-23 23:03:04 +00:00
m . Message = emptyMessageBody
2021-12-15 14:41:55 +00:00
}
2021-12-10 16:31:42 +00:00
delayed := m . Time > time . Now ( ) . Unix ( )
if ! delayed {
if err := t . Publish ( m ) ; err != nil {
return err
}
}
2021-12-25 16:26:18 +00:00
if s . firebase != nil && firebase && ! delayed && ! unifiedpush {
2021-12-09 17:15:17 +00:00
go func ( ) {
if err := s . firebase ( m ) ; err != nil {
log . Printf ( "Unable to publish to Firebase: %v" , err . Error ( ) )
}
} ( )
}
2021-12-25 16:26:18 +00:00
if s . mailer != nil && email != "" && ! delayed && ! unifiedpush {
2021-12-23 20:04:17 +00:00
go func ( ) {
2021-12-24 14:01:29 +00:00
if err := s . mailer . Send ( v . ip , email , m ) ; err != nil {
2021-12-23 20:04:17 +00:00
log . Printf ( "Unable to send email: %v" , err . Error ( ) )
}
} ( )
}
2021-12-25 16:26:18 +00:00
2021-12-09 15:23:17 +00:00
if cache {
if err := s . cache . AddMessage ( m ) ; err != nil {
return err
}
2021-11-02 18:08:21 +00:00
}
2021-12-15 21:12:40 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-10-24 17:34:15 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
2021-12-25 16:26:18 +00:00
2021-11-03 15:33:34 +00:00
if err := json . NewEncoder ( w ) . Encode ( m ) ; err != nil {
return err
}
2021-12-15 21:12:40 +00:00
s . inc ( & s . messages )
2021-10-24 17:34:15 +00:00
return nil
2021-10-23 01:26:01 +00:00
}
2021-12-23 20:04:17 +00:00
func ( s * Server ) parseParams ( r * http . Request , m * message ) ( cache bool , firebase bool , email string , err error ) {
2021-12-15 14:41:55 +00:00
cache = readParam ( r , "x-cache" , "cache" ) != "no"
firebase = readParam ( r , "x-firebase" , "firebase" ) != "no"
2021-12-23 23:03:04 +00:00
email = readParam ( r , "x-email" , "x-e-mail" , "email" , "e-mail" , "mail" , "e" )
2021-12-22 08:44:16 +00:00
m . Title = readParam ( r , "x-title" , "title" , "t" )
2021-12-15 14:41:55 +00:00
messageStr := readParam ( r , "x-message" , "message" , "m" )
if messageStr != "" {
m . Message = messageStr
}
2021-12-17 01:33:01 +00:00
m . Priority , err = util . ParsePriority ( readParam ( r , "x-priority" , "priority" , "prio" , "p" ) )
if err != nil {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestPriorityInvalid
2021-11-27 21:12:08 +00:00
}
2021-12-22 08:44:16 +00:00
tagsStr := readParam ( r , "x-tags" , "tags" , "tag" , "ta" )
2021-11-27 21:12:08 +00:00
if tagsStr != "" {
2021-12-10 16:31:42 +00:00
m . Tags = make ( [ ] string , 0 )
2021-12-21 20:22:27 +00:00
for _ , s := range util . SplitNoEmpty ( tagsStr , "," ) {
2021-12-10 16:31:42 +00:00
m . Tags = append ( m . Tags , strings . TrimSpace ( s ) )
2021-12-07 20:39:42 +00:00
}
2021-11-27 21:12:08 +00:00
}
2021-12-15 14:41:55 +00:00
delayStr := readParam ( r , "x-delay" , "delay" , "x-at" , "at" , "x-in" , "in" )
2021-12-11 05:06:25 +00:00
if delayStr != "" {
2021-12-10 16:31:42 +00:00
if ! cache {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestDelayNoCache
2021-12-10 16:31:42 +00:00
}
2021-12-23 23:03:04 +00:00
if email != "" {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
2021-12-23 23:03:04 +00:00
}
2021-12-11 05:06:25 +00:00
delay , err := util . ParseFutureTime ( delayStr , time . Now ( ) )
2021-12-10 16:31:42 +00:00
if err != nil {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestDelayCannotParse
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) < time . Now ( ) . Add ( s . config . MinDelay ) . Unix ( ) {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestDelayTooSmall
2021-12-11 05:06:25 +00:00
} else if delay . Unix ( ) > time . Now ( ) . Add ( s . config . MaxDelay ) . Unix ( ) {
2021-12-25 14:15:05 +00:00
return false , false , "" , errHTTPBadRequestDelayTooLarge
2021-12-10 16:31:42 +00:00
}
2021-12-11 05:06:25 +00:00
m . Time = delay . Unix ( )
2021-12-10 16:31:42 +00:00
}
2021-12-23 20:04:17 +00:00
return cache , firebase , email , nil
2021-11-27 21:12:08 +00:00
}
2021-12-15 14:41:55 +00:00
func readParam ( r * http . Request , names ... string ) string {
2021-11-27 21:12:08 +00:00
for _ , name := range names {
2021-12-15 14:41:55 +00:00
value := r . Header . Get ( name )
if value != "" {
return strings . TrimSpace ( value )
}
}
for _ , name := range names {
value := r . URL . Query ( ) . Get ( strings . ToLower ( name ) )
2021-11-27 21:12:08 +00:00
if value != "" {
2021-12-07 20:39:42 +00:00
return strings . TrimSpace ( value )
2021-11-27 21:12:08 +00:00
}
}
return ""
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeJSON ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
return "" , err
2021-10-23 01:26:01 +00:00
}
2021-10-27 18:56:17 +00:00
return buf . String ( ) , nil
2021-10-23 01:26:01 +00:00
}
2021-11-07 18:08:03 +00:00
return s . handleSubscribe ( w , r , v , "json" , "application/x-ndjson" , encoder )
2021-10-23 01:26:01 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeSSE ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-10-23 17:21:33 +00:00
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( & msg ) ; err != nil {
2021-10-27 18:56:17 +00:00
return "" , err
2021-10-23 17:21:33 +00:00
}
2021-10-29 12:29:27 +00:00
if msg . Event != messageEvent {
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "event: %s\ndata: %s\n" , msg . Event , buf . String ( ) ) , nil // Browser's .onmessage() does not fire on this!
2021-10-23 17:21:33 +00:00
}
2021-10-27 18:56:17 +00:00
return fmt . Sprintf ( "data: %s\n" , buf . String ( ) ) , nil
2021-10-23 19:22:17 +00:00
}
2021-11-01 19:21:38 +00:00
return s . handleSubscribe ( w , r , v , "sse" , "text/event-stream" , encoder )
2021-10-23 17:21:33 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribeRaw ( w http . ResponseWriter , r * http . Request , v * visitor ) error {
2021-10-27 18:56:17 +00:00
encoder := func ( msg * message ) ( string , error ) {
2021-11-02 18:10:56 +00:00
if msg . Event == messageEvent { // only handle default events
2021-10-27 18:56:17 +00:00
return strings . ReplaceAll ( msg . Message , "\n" , " " ) + "\n" , nil
}
return "\n" , nil // "keepalive" and "open" events just send an empty line
}
2021-11-01 19:21:38 +00:00
return s . handleSubscribe ( w , r , v , "raw" , "text/plain" , encoder )
2021-10-27 18:56:17 +00:00
}
2021-11-01 19:21:38 +00:00
func ( s * Server ) handleSubscribe ( w http . ResponseWriter , r * http . Request , v * visitor , format string , contentType string , encoder messageEncoder ) error {
2021-12-25 14:15:05 +00:00
if err := v . SubscriptionAllowed ( ) ; err != nil {
return errHTTPTooManyRequestsLimitSubscriptions
2021-11-01 19:21:38 +00:00
}
defer v . RemoveSubscription ( )
2021-11-15 12:56:58 +00:00
topicsStr := strings . TrimSuffix ( r . URL . Path [ 1 : ] , "/" + format ) // Hack
2021-12-21 20:22:27 +00:00
topicIDs := util . SplitNoEmpty ( topicsStr , "," )
2021-11-15 12:56:58 +00:00
topics , err := s . topicsFromIDs ( topicIDs ... )
2021-11-01 20:39:40 +00:00
if err != nil {
return err
}
2021-12-22 08:44:16 +00:00
poll := readParam ( r , "x-poll" , "poll" , "po" ) == "1"
scheduled := readParam ( r , "x-scheduled" , "scheduled" , "sched" ) == "1"
since , err := parseSince ( r , poll )
2021-10-29 17:58:14 +00:00
if err != nil {
return err
}
2021-12-21 20:22:27 +00:00
messageFilter , titleFilter , priorityFilter , tagsFilter , err := parseQueryFilters ( r )
if err != nil {
return err
}
2021-12-22 08:44:16 +00:00
var wlock sync . Mutex
2021-10-27 18:56:17 +00:00
sub := func ( msg * message ) error {
2021-12-21 20:22:27 +00:00
if ! passesQueryFilter ( msg , messageFilter , titleFilter , priorityFilter , tagsFilter ) {
return nil
}
2021-10-27 18:56:17 +00:00
m , err := encoder ( msg )
if err != nil {
return err
}
2021-12-21 20:22:27 +00:00
wlock . Lock ( )
defer wlock . Unlock ( )
2021-10-27 18:56:17 +00:00
if _ , err := w . Write ( [ ] byte ( m ) ) ; err != nil {
2021-10-24 01:29:45 +00:00
return err
}
if fl , ok := w . ( http . Flusher ) ; ok {
fl . Flush ( )
}
return nil
2021-10-27 18:56:17 +00:00
}
2021-11-07 18:08:03 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Content-Type" , contentType + "; charset=utf-8" ) // Android/Volley client needs charset!
2021-10-29 17:58:14 +00:00
if poll {
2021-12-10 16:31:42 +00:00
return s . sendOldMessages ( topics , since , scheduled , sub )
2021-10-29 17:58:14 +00:00
}
2021-11-15 12:56:58 +00:00
subscriberIDs := make ( [ ] int , 0 )
for _ , t := range topics {
subscriberIDs = append ( subscriberIDs , t . Subscribe ( sub ) )
}
defer func ( ) {
for i , subscriberID := range subscriberIDs {
topics [ i ] . Unsubscribe ( subscriberID ) // Order!
}
} ( )
if err := sub ( newOpenMessage ( topicsStr ) ) ; err != nil { // Send out open message
2021-10-29 17:58:14 +00:00
return err
}
2021-12-10 16:31:42 +00:00
if err := s . sendOldMessages ( topics , since , scheduled , sub ) ; err != nil {
2021-10-27 18:56:17 +00:00
return err
}
for {
select {
case <- r . Context ( ) . Done ( ) :
return nil
case <- time . After ( s . config . KeepaliveInterval ) :
2021-11-01 19:21:38 +00:00
v . Keepalive ( )
2021-11-15 12:56:58 +00:00
if err := sub ( newKeepaliveMessage ( topicsStr ) ) ; err != nil { // Send keepalive message
2021-10-27 18:56:17 +00:00
return err
}
}
2021-10-24 01:29:45 +00:00
}
}
2021-12-22 12:46:17 +00:00
func parseQueryFilters ( r * http . Request ) ( messageFilter string , titleFilter string , priorityFilter [ ] int , tagsFilter [ ] string , err error ) {
2021-12-22 08:44:16 +00:00
messageFilter = readParam ( r , "x-message" , "message" , "m" )
titleFilter = readParam ( r , "x-title" , "title" , "t" )
tagsFilter = util . SplitNoEmpty ( readParam ( r , "x-tags" , "tags" , "tag" , "ta" ) , "," )
2021-12-22 12:46:17 +00:00
priorityFilter = make ( [ ] int , 0 )
for _ , p := range util . SplitNoEmpty ( readParam ( r , "x-priority" , "priority" , "prio" , "p" ) , "," ) {
priority , err := util . ParsePriority ( p )
if err != nil {
return "" , "" , nil , nil , err
}
priorityFilter = append ( priorityFilter , priority )
}
return
2021-12-21 20:22:27 +00:00
}
2021-12-22 12:46:17 +00:00
func passesQueryFilter ( msg * message , messageFilter string , titleFilter string , priorityFilter [ ] int , tagsFilter [ ] string ) bool {
2021-12-21 20:29:37 +00:00
if msg . Event != messageEvent {
return true // filters only apply to messages
}
2021-12-21 20:22:27 +00:00
if messageFilter != "" && msg . Message != messageFilter {
return false
}
if titleFilter != "" && msg . Title != titleFilter {
return false
}
2021-12-22 08:44:16 +00:00
messagePriority := msg . Priority
if messagePriority == 0 {
messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0)
}
2021-12-22 12:46:17 +00:00
if len ( priorityFilter ) > 0 && ! util . InIntList ( priorityFilter , messagePriority ) {
2021-12-21 20:22:27 +00:00
return false
}
if len ( tagsFilter ) > 0 && ! util . InStringListAll ( msg . Tags , tagsFilter ) {
return false
}
return true
}
2021-12-10 16:31:42 +00:00
func ( s * Server ) sendOldMessages ( topics [ ] * topic , since sinceTime , scheduled bool , sub subscriber ) error {
2021-11-08 14:46:31 +00:00
if since . IsNone ( ) {
2021-10-29 17:58:14 +00:00
return nil
}
2021-11-15 12:56:58 +00:00
for _ , t := range topics {
2021-12-10 16:31:42 +00:00
messages , err := s . cache . Messages ( t . ID , since , scheduled )
2021-11-15 12:56:58 +00:00
if err != nil {
2021-10-29 17:58:14 +00:00
return err
}
2021-11-15 12:56:58 +00:00
for _ , m := range messages {
if err := sub ( m ) ; err != nil {
return err
}
}
2021-10-29 17:58:14 +00:00
}
2021-10-24 17:34:15 +00:00
return nil
}
2021-11-08 14:46:31 +00:00
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
//
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages.
2021-12-22 08:44:16 +00:00
func parseSince ( r * http . Request , poll bool ) ( sinceTime , error ) {
since := readParam ( r , "x-since" , "since" , "si" )
if since == "" {
if poll {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
}
return sinceNoMessages , nil
}
2021-12-22 08:44:16 +00:00
if since == "all" {
2021-11-08 14:46:31 +00:00
return sinceAllMessages , nil
2021-12-22 08:44:16 +00:00
} else if s , err := strconv . ParseInt ( since , 10 , 64 ) ; err == nil {
2021-11-08 14:46:31 +00:00
return sinceTime ( time . Unix ( s , 0 ) ) , nil
2021-12-22 08:44:16 +00:00
} else if d , err := time . ParseDuration ( since ) ; err == nil {
2021-11-08 14:46:31 +00:00
return sinceTime ( time . Now ( ) . Add ( - 1 * d ) ) , nil
2021-10-29 17:58:14 +00:00
}
2021-12-25 14:15:05 +00:00
return sinceNoMessages , errHTTPBadRequestSinceInvalid
2021-10-29 17:58:14 +00:00
}
2021-12-07 16:45:15 +00:00
func ( s * Server ) handleOptions ( w http . ResponseWriter , _ * http . Request ) error {
2021-10-29 17:58:14 +00:00
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" ) // CORS, allow cross-origin requests
w . Header ( ) . Set ( "Access-Control-Allow-Methods" , "GET, PUT, POST" )
2021-10-24 18:22:53 +00:00
return nil
}
2021-12-15 14:41:55 +00:00
func ( s * Server ) topicFromPath ( path string ) ( * topic , error ) {
parts := strings . Split ( path , "/" )
if len ( parts ) < 2 {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicInvalid
2021-12-15 14:41:55 +00:00
}
topics , err := s . topicsFromIDs ( parts [ 1 ] )
2021-11-15 12:56:58 +00:00
if err != nil {
return nil , err
}
return topics [ 0 ] , nil
}
2021-11-27 21:12:08 +00:00
func ( s * Server ) topicsFromIDs ( ids ... string ) ( [ ] * topic , error ) {
2021-10-23 01:26:01 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-11-15 12:56:58 +00:00
topics := make ( [ ] * topic , 0 )
2021-11-27 21:12:08 +00:00
for _ , id := range ids {
2021-12-09 03:13:59 +00:00
if util . InStringList ( disallowedTopics , id ) {
2021-12-25 14:15:05 +00:00
return nil , errHTTPBadRequestTopicDisallowed
2021-12-09 03:13:59 +00:00
}
2021-11-15 12:56:58 +00:00
if _ , ok := s . topics [ id ] ; ! ok {
if len ( s . topics ) >= s . config . GlobalTopicLimit {
2021-12-25 14:15:05 +00:00
return nil , errHTTPTooManyRequestsLimitGlobalTopics
2021-11-15 12:56:58 +00:00
}
2021-12-09 03:57:31 +00:00
s . topics [ id ] = newTopic ( id )
2021-10-29 17:58:14 +00:00
}
2021-11-15 12:56:58 +00:00
topics = append ( topics , s . topics [ id ] )
2021-10-23 01:26:01 +00:00
}
2021-11-15 12:56:58 +00:00
return topics , nil
2021-10-23 01:26:01 +00:00
}
2021-12-11 03:57:01 +00:00
func ( s * Server ) updateStatsAndPrune ( ) {
2021-10-23 01:26:01 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-10-29 17:58:14 +00:00
// Expire visitors from rate visitors map
for ip , v := range s . visitors {
2021-11-01 19:21:38 +00:00
if v . Stale ( ) {
2021-10-29 17:58:14 +00:00
delete ( s . visitors , ip )
}
2021-10-23 01:26:01 +00:00
}
2021-10-24 01:29:45 +00:00
2021-12-11 03:57:01 +00:00
// Prune message cache
2021-12-09 03:57:31 +00:00
olderThan := time . Now ( ) . Add ( - 1 * s . config . CacheDuration )
if err := s . cache . Prune ( olderThan ) ; err != nil {
2021-11-03 01:09:49 +00:00
log . Printf ( "error pruning cache: %s" , err . Error ( ) )
2021-11-02 18:08:21 +00:00
}
2021-12-11 03:57:01 +00:00
// Prune old topics, remove subscriptions without subscribers
2021-11-03 01:09:49 +00:00
var subscribers , messages int
2021-10-29 17:58:14 +00:00
for _ , t := range s . topics {
2021-11-03 01:09:49 +00:00
subs := t . Subscribers ( )
2021-12-09 03:57:31 +00:00
msgs , err := s . cache . MessageCount ( t . ID )
2021-11-03 01:09:49 +00:00
if err != nil {
2021-12-09 03:57:31 +00:00
log . Printf ( "cannot get stats for topic %s: %s" , t . ID , err . Error ( ) )
2021-11-03 01:09:49 +00:00
continue
}
2021-12-09 17:15:17 +00:00
if msgs == 0 && subs == 0 {
2021-12-09 03:57:31 +00:00
delete ( s . topics , t . ID )
2021-11-03 01:09:49 +00:00
continue
2021-10-29 17:58:14 +00:00
}
subscribers += subs
messages += msgs
2021-10-24 01:29:45 +00:00
}
2021-11-03 01:09:49 +00:00
// Print stats
2021-10-29 17:58:14 +00:00
log . Printf ( "Stats: %d message(s) published, %d topic(s) active, %d subscriber(s), %d message(s) buffered, %d visitor(s)" ,
s . messages , len ( s . topics ) , subscribers , messages , len ( s . visitors ) )
2021-10-24 01:29:45 +00:00
}
2021-10-24 02:49:50 +00:00
2021-12-15 14:13:16 +00:00
func ( s * Server ) runManager ( ) {
2021-12-22 13:17:50 +00:00
for {
select {
case <- time . After ( s . config . ManagerInterval ) :
2021-12-15 14:13:16 +00:00
s . updateStatsAndPrune ( )
2021-12-22 13:17:50 +00:00
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
2021-12-22 13:17:50 +00:00
}
2021-12-15 14:13:16 +00:00
}
func ( s * Server ) runAtSender ( ) {
for {
2021-12-22 13:17:50 +00:00
select {
case <- time . After ( s . config . AtSenderInterval ) :
if err := s . sendDelayedMessages ( ) ; err != nil {
log . Printf ( "error sending scheduled messages: %s" , err . Error ( ) )
}
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
}
}
func ( s * Server ) runFirebaseKeepliver ( ) {
if s . firebase == nil {
return
}
for {
2021-12-22 13:17:50 +00:00
select {
case <- time . After ( s . config . FirebaseKeepaliveInterval ) :
if err := s . firebase ( newKeepaliveMessage ( firebaseControlTopic ) ) ; err != nil {
log . Printf ( "error sending Firebase keepalive message: %s" , err . Error ( ) )
}
case <- s . closeChan :
return
2021-12-15 14:13:16 +00:00
}
}
}
2021-12-22 13:17:50 +00:00
2021-12-10 16:31:42 +00:00
func ( s * Server ) sendDelayedMessages ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
messages , err := s . cache . MessagesDue ( )
if err != nil {
return err
}
for _ , m := range messages {
t , ok := s . topics [ m . Topic ] // If no subscribers, just mark message as published
if ok {
if err := t . Publish ( m ) ; err != nil {
log . Printf ( "unable to publish message %s to topic %s: %v" , m . ID , m . Topic , err . Error ( ) )
}
if s . firebase != nil {
if err := s . firebase ( m ) ; err != nil {
log . Printf ( "unable to publish to Firebase: %v" , err . Error ( ) )
}
}
2021-12-23 23:03:04 +00:00
// TODO delayed email sending
2021-12-10 16:31:42 +00:00
}
if err := s . cache . MarkPublished ( m ) ; err != nil {
return err
}
}
return nil
}
2021-11-05 17:46:27 +00:00
func ( s * Server ) withRateLimit ( w http . ResponseWriter , r * http . Request , handler func ( w http . ResponseWriter , r * http . Request , v * visitor ) error ) error {
v := s . visitor ( r )
if err := v . RequestAllowed ( ) ; err != nil {
2021-12-25 14:15:05 +00:00
return errHTTPTooManyRequestsLimitRequests
2021-11-05 17:46:27 +00:00
}
return handler ( w , r , v )
}
2021-10-24 02:49:50 +00:00
// visitor creates or retrieves a rate.Limiter for the given visitor.
// This function was taken from https://www.alexedwards.net/blog/how-to-rate-limit-http-requests (MIT).
2021-11-05 17:46:27 +00:00
func ( s * Server ) visitor ( r * http . Request ) * visitor {
2021-10-24 02:49:50 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-11-05 17:46:27 +00:00
remoteAddr := r . RemoteAddr
2021-10-24 02:49:50 +00:00
ip , _ , err := net . SplitHostPort ( remoteAddr )
if err != nil {
ip = remoteAddr // This should not happen in real life; only in tests.
}
2021-11-05 17:46:27 +00:00
if s . config . BehindProxy && r . Header . Get ( "X-Forwarded-For" ) != "" {
ip = r . Header . Get ( "X-Forwarded-For" )
}
2021-10-24 02:49:50 +00:00
v , exists := s . visitors [ ip ]
if ! exists {
2021-12-24 14:01:29 +00:00
s . visitors [ ip ] = newVisitor ( s . config , ip )
2021-11-01 19:21:38 +00:00
return s . visitors [ ip ]
2021-10-24 02:49:50 +00:00
}
2021-12-22 09:04:59 +00:00
v . Keepalive ( )
2021-10-24 02:49:50 +00:00
return v
}
2021-12-15 21:12:40 +00:00
func ( s * Server ) inc ( counter * int64 ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
* counter ++
}