Replace polling with server sent events

This commit is contained in:
Eduard Urbach 2018-11-07 16:49:49 +09:00
parent e56782d5a3
commit 79da84956e
8 changed files with 108 additions and 117 deletions

View File

@ -38,6 +38,12 @@ func MarkNotificationsAsSeen(ctx *aero.Context) string {
notification.Save() notification.Save()
} }
// Update the counter on all clients
user.BroadcastEvent(&aero.Event{
Name: "notificationCount",
Data: 0,
})
return "ok" return "ok"
} }

View File

@ -4,10 +4,19 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/animenotifier/notify.moe/components/css"
"github.com/animenotifier/notify.moe/components/js"
"github.com/aerogo/aero" "github.com/aerogo/aero"
"github.com/animenotifier/notify.moe/utils" "github.com/animenotifier/notify.moe/utils"
) )
var (
scriptsETag = aero.ETagString(js.Bundle())
stylesETag = aero.ETagString(css.Bundle())
streams = map[string][]*aero.EventStream{}
)
// Events streams server events to the client. // Events streams server events to the client.
func Events(ctx *aero.Context) string { func Events(ctx *aero.Context) string {
user := utils.GetUser(ctx) user := utils.GetUser(ctx)
@ -17,26 +26,47 @@ func Events(ctx *aero.Context) string {
} }
fmt.Println(user.Nick, "receiving live events") fmt.Println(user.Nick, "receiving live events")
stream := aero.NewEventStream()
events := make(chan *aero.Event) user.AddEventStream(stream)
disconnected := make(chan struct{})
go func() { go func() {
defer fmt.Println(user.Nick, "disconnected, stop sending events") defer fmt.Println(user.Nick, "disconnected, stop sending events")
stream.Events <- &aero.Event{
Name: "etag",
Data: struct {
URL string `json:"url"`
ETag string `json:"etag"`
}{
URL: "/scripts",
ETag: scriptsETag,
},
}
stream.Events <- &aero.Event{
Name: "etag",
Data: struct {
URL string `json:"url"`
ETag string `json:"etag"`
}{
URL: "/styles",
ETag: stylesETag,
},
}
for { for {
select { select {
case <-disconnected: case <-stream.Closed:
close(events) user.RemoveEventStream(stream)
return return
// case <-time.After(10 * time.Second): // case <-time.After(10 * time.Second):
// events <- &aero.Event{ // stream.Events <- &aero.Event{
// Name: "ping", // Name: "ping",
// } // }
} }
} }
}() }()
return ctx.EventStream(events, disconnected) return ctx.EventStream(stream)
} }

View File

@ -31,18 +31,6 @@ export async function markNotificationsAsSeen(arn: AnimeNotifier) {
credentials: "same-origin" credentials: "same-origin"
}) })
// Update notification counter
if("serviceWorker" in navigator) {
// If we have service worker support, broadcast the "notifications marked as seen" message to all open tabs
arn.serviceWorkerManager.postMessage({
type: "broadcast",
realType: "notifications marked as seen"
})
} else {
// If there is no service worker, update the counter on this tab
arn.notificationManager.update()
}
// Update notifications // Update notifications
arn.reloadContent() arn.reloadContent()
} }

View File

@ -10,7 +10,6 @@ import SideBar from "./SideBar"
import InfiniteScroller from "./InfiniteScroller" import InfiniteScroller from "./InfiniteScroller"
import ServiceWorkerManager from "./ServiceWorkerManager" import ServiceWorkerManager from "./ServiceWorkerManager"
import ServerEvents from "./ServerEvents" import ServerEvents from "./ServerEvents"
import { checkNewVersionDelayed } from "./NewVersionCheck"
import { displayAiringDate, displayDate, displayTime } from "./DateView" import { displayAiringDate, displayDate, displayTime } from "./DateView"
import { findAll, canUseWebP, requestIdleCallback, swapElements, delay, findAllInside } from "./Utils" import { findAll, canUseWebP, requestIdleCallback, swapElements, delay, findAllInside } from "./Utils"
import * as actions from "./Actions" import * as actions from "./Actions"
@ -209,18 +208,11 @@ export default class AnimeNotifier {
// Notification manager // Notification manager
if(this.user) { if(this.user) {
this.notificationManager.update() this.notificationManager.update()
// Periodically check notifications
setInterval(() => this.notificationManager.update(), 300000)
} }
// Bind unload event // Bind unload event
window.addEventListener("beforeunload", this.onBeforeUnload.bind(this)) window.addEventListener("beforeunload", this.onBeforeUnload.bind(this))
// Periodically check etags of scripts and styles to let the user know about page updates
checkNewVersionDelayed("/scripts", this.statusMessage)
checkNewVersionDelayed("/styles", this.statusMessage)
// Show microphone icon if speech input is available // Show microphone icon if speech input is available
if(window["SpeechRecognition"] || window["webkitSpeechRecognition"]) { if(window["SpeechRecognition"] || window["webkitSpeechRecognition"]) {
document.getElementsByClassName("speech-input")[0].classList.add("speech-input-available") document.getElementsByClassName("speech-input")[0].classList.add("speech-input-available")
@ -239,7 +231,7 @@ export default class AnimeNotifier {
// Server sent events // Server sent events
if(this.user && EventSource) { if(this.user && EventSource) {
this.serverEvents = new ServerEvents() this.serverEvents = new ServerEvents(this)
} }
// // Download popular anime titles for the search // // Download popular anime titles for the search

View File

@ -1,61 +0,0 @@
import { delay, requestIdleCallback } from "./Utils"
import StatusMessage from "./StatusMessage"
const newVersionCheckDelay = location.hostname === "notify.moe" ? 60000 : 3000
let etags = new Map<string, string>()
let hasNewVersion = false
async function checkNewVersion(url: string, statusMessage: StatusMessage) {
if(hasNewVersion) {
return
}
try {
let headers = {}
if(etags.has(url)) {
headers["If-None-Match"] = etags.get(url)
}
let response = await fetch(url, {
headers,
credentials: "omit"
})
// Not modified response
if(response.status === 304) {
return
}
if(!response.ok) {
console.warn("Error fetching", url, response.status)
return
}
let newETag = response.headers.get("ETag")
let oldETag = etags.get(url)
if(newETag) {
etags.set(url, newETag)
}
if(oldETag && newETag && oldETag !== newETag) {
statusMessage.showInfo("A new version of the website is available. Please refresh the page.", -1)
// Do not check for new versions again.
hasNewVersion = true
return
}
} catch(err) {
console.warn("Error fetching", url + "\n", err)
} finally {
checkNewVersionDelayed(url, statusMessage)
}
}
export function checkNewVersionDelayed(url: string, statusMessage: StatusMessage) {
return delay(newVersionCheckDelay).then(() => {
requestIdleCallback(() => checkNewVersion(url, statusMessage))
})
}

View File

@ -16,7 +16,11 @@ export default class NotificationManager {
}) })
let body = await response.text() let body = await response.text()
this.unseen = parseInt(body) this.setCounter(parseInt(body))
}
setCounter(unseen: number) {
this.unseen = unseen
if(isNaN(this.unseen)) { if(isNaN(this.unseen)) {
this.unseen = 0 this.unseen = 0

View File

@ -1,3 +1,7 @@
import AnimeNotifier from "./AnimeNotifier"
const reconnectDelay = 3000
class ServerEvent { class ServerEvent {
data: string data: string
} }
@ -5,22 +9,56 @@ class ServerEvent {
export default class ServerEvents { export default class ServerEvents {
supported: boolean supported: boolean
eventSource: EventSource eventSource: EventSource
arn: AnimeNotifier
etags: Map<string, string>
constructor() { constructor(arn: AnimeNotifier) {
this.supported = ("EventSource" in window) this.supported = ("EventSource" in window)
if(!this.supported) { if(!this.supported) {
return return
} }
this.arn = arn
this.etags = new Map<string, string>()
this.connect()
}
connect() {
if(this.eventSource) {
this.eventSource.close()
}
this.eventSource = new EventSource("/api/sse/events", { this.eventSource = new EventSource("/api/sse/events", {
withCredentials: true withCredentials: true
}) })
this.eventSource.addEventListener("ping", (e: any) => this.ping(e)) this.eventSource.addEventListener("ping", (e: any) => this.ping(e))
this.eventSource.addEventListener("etag", (e: any) => this.etag(e))
this.eventSource.addEventListener("notificationCount", (e: any) => this.notificationCount(e))
this.eventSource.onerror = () => {
setTimeout(() => this.connect(), reconnectDelay)
}
} }
ping(e: ServerEvent) { ping(e: ServerEvent) {
console.log("sse: ping") console.log("ping")
}
etag(e: ServerEvent) {
let data = JSON.parse(e.data)
let oldETag = this.etags.get(data.url)
let newETag = data.etag
if(oldETag && newETag && oldETag != newETag) {
this.arn.statusMessage.showInfo("A new version of the website is available. Please refresh the page.", -1)
}
this.etags.set(data.url, newETag)
}
notificationCount(e: ServerEvent) {
this.arn.notificationManager.setCounter(parseInt(e.data))
} }
} }

View File

@ -31,7 +31,6 @@ export default class ServiceWorkerManager {
// A reloadContent call should never trigger another reload // A reloadContent call should never trigger another reload
if(this.arn.app.currentPath === this.arn.lastReloadContentPath) { if(this.arn.app.currentPath === this.arn.lastReloadContentPath) {
console.log("reload finished.")
this.arn.lastReloadContentPath = "" this.arn.lastReloadContentPath = ""
return return
} }
@ -68,32 +67,27 @@ export default class ServiceWorkerManager {
} }
onMessage(evt: MessageEvent) { onMessage(evt: MessageEvent) {
let message = JSON.parse(evt.data) // let message = JSON.parse(evt.data)
switch(message.type) { // switch(message.type) {
case "new notification": // case "new content":
case "notifications marked as seen": // if(message.url.includes("/_/")) {
this.arn.notificationManager.update() // // Content reload
break // this.arn.contentLoadedActions.then(() => {
// this.arn.reloadContent(true)
// })
// } else {
// // Full page reload
// this.arn.contentLoadedActions.then(() => {
// this.arn.reloadPage()
// })
// }
case "new content": // break
if(message.url.includes("/_/")) {
// Content reload
this.arn.contentLoadedActions.then(() => {
this.arn.reloadContent(true)
})
} else {
// Full page reload
this.arn.contentLoadedActions.then(() => {
this.arn.reloadPage()
})
}
break // // case "offline":
// // this.arn.statusMessage.showError("You are viewing an offline version of the site now.")
// case "offline": // // break
// this.arn.statusMessage.showError("You are viewing an offline version of the site now.") // }
// break
}
} }
} }