refactor(rtc): go through the process of missionary-rtc-loop and

fix several bugs
pull/11293/head
rcmerci 2024-04-30 15:29:10 +08:00
parent 9c2b205027
commit d39fb0782b
3 changed files with 86 additions and 45 deletions

View File

@ -24,8 +24,10 @@
"Return a task: throw exception if recv ex-data response" "Return a task: throw exception if recv ex-data response"
[get-mws-create-task message] [get-mws-create-task message]
(m/sp (m/sp
(handle-remote-ex (let [mws (m/? get-mws-create-task)]
(m/? (ws/send&recv get-mws-create-task message))))) (handle-remote-ex
(rtc-const/data-from-ws-coercer
(m/? (ws/send&recv mws message)))))))
(defn- register-graph-updates (defn- register-graph-updates
[get-mws-create-task graph-uuid] [get-mws-create-task graph-uuid]
@ -38,14 +40,15 @@
But ensure `register-graph-updates` has been sent" But ensure `register-graph-updates` has been sent"
[get-mws-create-task graph-uuid] [get-mws-create-task graph-uuid]
(assert (some? graph-uuid)) (assert (some? graph-uuid))
(let [*sent (atom {})] (let [*sent (atom {}) ;; mws->bool
]
(m/sp (m/sp
(let [mws (m/? get-mws-create-task)] (let [mws (m/? get-mws-create-task)]
(when (contains? @*sent mws) (when-not (contains? @*sent mws)
(swap! *sent mws false)) (swap! *sent assoc mws false))
(when (not (@*sent mws)) (when (not (@*sent mws))
(m/? (register-graph-updates (m/sp mws) graph-uuid)) (m/? (register-graph-updates get-mws-create-task graph-uuid))
(swap! *sent mws true)) (swap! *sent assoc mws true))
mws)))) mws))))
(defn- remove-non-exist-block-uuids-in-add-retract-map (defn- remove-non-exist-block-uuids-in-add-retract-map
@ -328,5 +331,6 @@
(do (assert (pos? (:t r)) r) (do (assert (pos? (:t r)) r)
(op-mem-layer/commit! repo) (op-mem-layer/commit! repo)
(r.remote-update/apply-remote-update repo conn date-formatter r add-log-fn) (r.remote-update/apply-remote-update
repo conn date-formatter {:type :remote-update :value r} add-log-fn)
(add-log-fn {:type ::push-client-updates :remote-t (:t r)}))))))) (add-log-fn {:type ::push-client-updates :remote-t (:t r)})))))))

View File

@ -1,17 +1,20 @@
(ns frontend.worker.rtc.core2 (ns frontend.worker.rtc.core2
"Main(use missionary) ns for rtc related fns" "Main(use missionary) ns for rtc related fns"
(:require [frontend.worker.rtc.client :as r.client] (:require [frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update] [frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.ws2 :as ws] [frontend.worker.rtc.ws2 :as ws]
[frontend.worker.state :as worker-state] [frontend.worker.state :as worker-state]
[goog.string :as gstring] [goog.string :as gstring]
[logseq.common.missionary-util :as c.m] [logseq.common.missionary-util :as c.m]
[malli.core :as ma] [malli.core :as ma]
[missionary.core :as m])) [missionary.core :as m])
(:import [missionary Cancelled]))
(def ^:private rtc-state-schema (def ^:private rtc-state-schema
[:map [:map
[:ws-state [:enum :connecting :open :closing :closed]]]) [:ws-state {:optional true} [:enum :connecting :open :closing :closed]]])
(def ^:private rtc-state-validator (ma/validator rtc-state-schema)) (def ^:private rtc-state-validator (ma/validator rtc-state-schema))
(defn- get-ws-url (defn- get-ws-url
@ -28,7 +31,9 @@
(let [mws (m/? get-mws-create-task) (let [mws (m/? get-mws-create-task)
x (try x (try
(m/?> (m/eduction (m/?> (m/eduction
(filter (fn [data] (= "push-updates" (:req-id data)))) (keep (fn [data]
(when (= "push-updates" (:req-id data))
(rtc-const/data-from-ws-coercer data))))
(ws/recv-flow mws))) (ws/recv-flow mws)))
(catch js/CloseEvent _ (catch js/CloseEvent _
sentinel))] sentinel))]
@ -37,13 +42,14 @@
(m/amb x (recur)))))))) (m/amb x (recur))))))))
(defn- create-local-updates-check-flow (defn- create-local-updates-check-flow
"Return a flow" "Return a flow: emit if need to push local-updates"
[*auto-push? interval-ms] [repo *auto-push? interval-ms]
(let [auto-push-flow (m/watch *auto-push?) (let [auto-push-flow (m/watch *auto-push?)
clock-flow (c.m/clock interval-ms :clock) clock-flow (c.m/clock interval-ms :clock)
merge-flow (m/latest vector auto-push-flow clock-flow)] merge-flow (m/latest vector auto-push-flow clock-flow)]
(m/eduction (filter first) (m/eduction (filter first)
(map second) (map second)
(filter (fn [v] (when (pos? (op-mem-layer/get-unpushed-block-update-count repo)) v)))
merge-flow))) merge-flow)))
(comment (comment
@ -57,13 +63,13 @@
"Return a flow that emits all kinds of events: "Return a flow that emits all kinds of events:
`:remote-update`: remote-updates data from server `:remote-update`: remote-updates data from server
`:local-update-check`: event to notify to check if there're some new local-updates, then push to remote." `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote."
[get-mws-create-task *auto-push?] [repo get-mws-create-task *auto-push?]
(let [remote-updates-flow (m/eduction (let [remote-updates-flow (m/eduction
(map (fn [data] {:type :remote-update :value data})) (map (fn [data] {:type :remote-update :value data}))
(get-remote-updates get-mws-create-task)) (get-remote-updates get-mws-create-task))
local-updates-check-flow (m/eduction local-updates-check-flow (m/eduction
(map (fn [data] {:type :local-update-check :value data})) (map (fn [data] {:type :local-update-check :value data}))
(create-local-updates-check-flow *auto-push? 2000))] (create-local-updates-check-flow repo *auto-push? 2000))]
(c.m/mix remote-updates-flow local-updates-check-flow))) (c.m/mix remote-updates-flow local-updates-check-flow)))
(defn- create-get-mws-create-task (defn- create-get-mws-create-task
@ -86,18 +92,20 @@
(defn- create-mws-state-flow (defn- create-mws-state-flow
[*current-mws] [*current-mws]
(m/ap (m/relieve
(if-let [mws (m/?< (m/watch *current-mws))] (m/ap
(m/?< (ws/create-mws-state-flow mws)) (if-let [mws (m/?< (m/watch *current-mws))]
(m/amb)))) (m/?< (ws/create-mws-state-flow mws))
(m/amb)))))
(defn- create-rtc-state-flow (defn- create-rtc-state-flow
[mws-state-flow] [mws-state-flow]
(m/latest (m/latest
(fn [mws-state] (fn [mws-state]
{:post [(rtc-state-validator %)]} {:post [(rtc-state-validator %)]}
{:ws-state mws-state}) (cond-> {}
mws-state-flow)) mws-state (assoc :ws-state mws-state)))
(m/reductions {} nil mws-state-flow)))
(defn create-rtc-loop (defn create-rtc-loop
"Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push?] "Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push?]
@ -110,26 +118,50 @@
add-log-fn #(reset! *log [(js/Date.) %]) add-log-fn #(reset! *log [(js/Date.) %])
[*current-mws get-mws-create-task] (create-get-mws-create-task ws-url) [*current-mws get-mws-create-task] (create-get-mws-create-task ws-url)
get-mws-create-task (r.client/ensure-register-graph-updates get-mws-create-task graph-uuid) get-mws-create-task (r.client/ensure-register-graph-updates get-mws-create-task graph-uuid)
mixed-flow (create-mixed-flow get-mws-create-task *auto-push?)] mixed-flow (create-mixed-flow repo get-mws-create-task *auto-push?)]
{:rtc-log-flow (m/buffer 100 (m/watch *log)) {:rtc-log-flow (m/buffer 100 (m/watch *log))
:rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws)) :rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws))
:*rtc-auto-push? *auto-push? :*rtc-auto-push? *auto-push?
:rtc-loop-task :rtc-loop-task
(m/sp (m/sp
;; init run to open a ws (try
(m/? get-mws-create-task) ;; init run to open a ws
(->> (m/? get-mws-create-task)
(let [event (m/?> mixed-flow)] (->>
(case (:type event) (let [event (m/?> mixed-flow)]
:remote-update (case (:type event)
(r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn) :remote-update
(r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn)
:local-update-check :local-update-check
(m/? (r.client/create-push-local-ops-task (m/? (r.client/create-push-local-ops-task
repo conn user-uuid graph-uuid date-formatter repo conn user-uuid graph-uuid date-formatter
get-mws-create-task add-log-fn)))) get-mws-create-task add-log-fn))))
(m/ap) (m/ap)
(m/reduce {}) (m/reduce {} nil)
(m/?)))})) (m/?))
(catch Cancelled _
(add-log-fn {:type ::cancelled}))))}))
(def send&recv r.client/send&recv) (def send&recv r.client/send&recv)
(comment
(do
(def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
(def graph-uuid "ff7186c1-5903-4bc8-b4e9-ca23525b9983")
(def repo "logseq_db_4-23")
(def conn (worker-state/get-datascript-conn repo))
(def date-formatter "MMM do, yyyy")
(def debug-ws-url "wss://ws-dev.logseq.com/rtc-sync?token=???")
(let [{:keys [rtc-log-flow rtc-state-flow *rtc-auto-push? rtc-loop-task]}
(create-rtc-loop user-uuid graph-uuid repo conn date-formatter nil {:debug-ws-url debug-ws-url})
c (rtc-loop-task #(js/console.log :succ %) #(js/console.log :fail %))]
(def cancel c)
(def rtc-log-flow rtc-log-flow)
(def rtc-state-flow rtc-state-flow)
(def *rtc-auto-push? *rtc-auto-push?)))
(cancel)
(def cancel2 ((m/reduce (fn [_ v] (prn :v v) v)
(m/latest vector rtc-state-flow (m/reductions {} nil rtc-log-flow)))
#(js/console.log :succ %) #(js/console.log :fail %)))
(cancel2))

View File

@ -171,17 +171,22 @@
(:recv-flow m-ws))) (:recv-flow m-ws)))
(defn send&recv (defn send&recv
"Return a task: send message wait to recv its response and return it" "Return a task: send message wait to recv its response and return it.
Throw if timeout"
[mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}] [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
(assert (pos-int? timeout-ms)) (assert (pos-int? timeout-ms))
(let [req-id (str (random-uuid)) (let [req-id (str (random-uuid))
message (assoc message :req-id req-id)] message (assoc message :req-id req-id)]
(m/sp (m/sp
(let [mws (m/? (send mws message))] (let [mws (m/? (send mws message))
(m/? (m/timeout result (m/?
(m/reduce (m/timeout
(fn [_ v] (m/reduce
(when (= req-id (:req-id v)) (fn [_ v]
(reduced v))) (when (= req-id (:req-id v))
(recv-flow mws)) (reduced v)))
timeout-ms)))))) (recv-flow mws))
timeout-ms))]
(when-not result
(throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true})))
result))))