refactor(rtc): replace origin rtc-start&stop

pull/11293/head
rcmerci 2024-05-01 19:11:41 +08:00
parent a417c43029
commit 2e67aed55d
7 changed files with 128 additions and 59 deletions

View File

@ -31,7 +31,10 @@
:db/id (:db/id entity)))))
(merge (cljs-bean.transit/writer-handlers)))]
(fn write-transit-str* [o]
(transit/write (transit/writer :json {:handlers write-handlers}) o))))
(try (transit/write (transit/writer :json {:handlers write-handlers}) o)
(catch :default e
(prn ::write-transit-str o)
(throw e))))))
(def read-transit-str
(let [read-handlers (assoc dt/read-handlers
@ -96,4 +99,4 @@
{:block/name (common-util/page-name-sanity-lc page-name)
:block/original-name page-name
:block/journal? false
:block/uuid (d/squuid)}))
:block/uuid (d/squuid)}))

View File

@ -18,7 +18,7 @@
(defn- stop
[]
(let [^object worker @db-browser/*worker]
(.rtc-stop worker))
(.rtc-stop2 worker))
(reset! debug-state nil))
(defn- push-pending-ops
@ -100,8 +100,7 @@
:on-click (fn []
(let [token (state/get-auth-id-token)
^object worker @db-browser/*worker]
(.rtc-start worker (state/get-current-repo) token
(state/sub [:ui/developer-mode?]))))}
(.rtc-start2 worker (state/get-current-repo) token)))}
(shui/tabler-icon "player-play") "start")
[:div.my-2.flex

View File

@ -248,6 +248,9 @@
[repo]
(worker-state/get-sqlite-conn repo {:search? true}))
(defn- with-write-transit-str
[p]
(p/chain p ldb/write-transit-str))
#_:clj-kondo/ignore
(defclass DBWorker
@ -555,7 +558,8 @@
;; RTC
(rtc-start2
[this repo token]
(rtc-core2/rtc-start repo token))
(with-write-transit-str
(js/Promise. (rtc-core2/create-rtc-start-task repo token))))
(rtc-stop2
[this]
@ -565,6 +569,38 @@
[this]
(rtc-core2/rtc-toggle-auto-push))
(rtc-grant-graph-access2
[this token graph-uuid target-user-uuids-str target-user-emails-str]
(let [target-user-uuids (ldb/read-transit-str target-user-uuids-str)
target-user-emails (ldb/read-transit-str target-user-emails-str)]
(with-write-transit-str
(js/Promise.
(rtc-core2/create-grant-access-to-others-task token graph-uuid
:target-user-uuids target-user-uuids
:target-user-emails target-user-emails)))))
(rtc-get-graphs2
[this token]
(with-write-transit-str
(js/Promise. (rtc-core2/create-get-graphs-task token))))
(rtc-delete-graph2
[this token graph-uuid]
(with-write-transit-str
(js/Promise. (rtc-core2/create-delete-graph-task token graph-uuid))))
(rtc-get-users-info2
[this token graph-uuid]
(with-write-transit-str
(js/Promise. (rtc-core2/create-get-user-info-task token graph-uuid))))
(rtc-get-block-content-versions2
[this token graph-uuid block-uuid]
(with-write-transit-str
(js/Promise. (rtc-core2/create-get-block-content-versions-task token graph-uuid block-uuid))))
;; ================================================================
(rtc-start
[this repo token dev-mode?]
(async-util/c->p

View File

@ -50,29 +50,26 @@
(defn <rtc-stop!
[]
(when-let [^js worker @state/*db-worker]
(.rtc-stop worker)))
(.rtc-stop2 worker)))
(defn <rtc-start!
[repo & {:keys [retry] :or {retry 0}}]
[repo]
(when-let [^js worker @state/*db-worker]
(when (ldb/get-graph-rtc-uuid (db/get-db repo))
(user-handler/<wrap-ensure-id&access-token
;; TODO: `<rtc-stop!` can return a chan so that we can remove timeout
(<rtc-stop!)
(let [token (state/get-auth-id-token)]
(p/let [result (.rtc-start worker repo token (state/sub [:ui/developer-mode?]))
_ (case result
"rtc-not-closed-yet"
(js/setTimeout #(<rtc-start! repo) 200)
":graph-not-ready"
(when (< retry 3)
(let [delay (* 2000 (inc retry))]
(prn "graph still creating, retry rtc-start in " delay "ms")
(p/do! (p/delay delay)
(<rtc-start! repo :retry (inc retry)))))
(p/let [result (.rtc-start2 worker repo token)
start-ex (ldb/read-transit-str result)
_ (case (:type (:ex-data start-ex))
(:rtc.exception/not-rtc-graph
:rtc.exception/not-found-db-conn)
(notification/show! (:ex-message start-ex) :error)
:rtc.exception/lock-failed
(js/setTimeout #(<rtc-start! repo) 1000)
(":break-rtc-loop" ":stop-rtc-loop")
nil
;; else
nil)]
nil))))))

View File

@ -8,6 +8,7 @@
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.ws2 :as ws]
[logseq.common.missionary-util :as c.m]
[missionary.core :as m]))
(def ^:private transit-w (transit/writer :json))
@ -31,8 +32,12 @@
(defn- register-graph-updates
[get-mws-create-task graph-uuid]
(send&recv get-mws-create-task {:action "register-graph-updates"
:graph-uuid graph-uuid}))
(m/sp
(let [{:keys [ex-data]}
(m/? (send&recv get-mws-create-task {:action "register-graph-updates"
:graph-uuid graph-uuid}))]
(when (= :graph-not-ready (:type ex-data))
(throw (ex-info "remote graph is still creating" {:missionary/retry true}))))))
(defn ensure-register-graph-updates
"Return a task: get or create a mws(missionary wrapped websocket).
@ -47,7 +52,9 @@
(when-not (contains? @*sent mws)
(swap! *sent assoc mws false))
(when (not (@*sent mws))
(m/? (register-graph-updates get-mws-create-task graph-uuid))
(m/? (c.m/backoff
(take 7 c.m/delays) ;retry 7 times (128s) if remote-graph is creating
(register-graph-updates get-mws-create-task graph-uuid)))
(swap! *sent assoc mws true))
mws))))

View File

@ -116,11 +116,11 @@
[started-dfv task]
(m/sp
(when-not (compare-and-set! *rtc-lock nil true)
(started-dfv false)
(throw (ex-info "Must not run multiple rtc-loops, try later"
{:type ::lock-failed
:missionary/retry true})))
(started-dfv true)
(let [e (ex-info "Must not run multiple rtc-loops, try later"
{:type :rtc.exception/lock-failed
:missionary/retry true})]
(started-dfv e)
(throw e)))
(try
(m/? task)
(finally
@ -153,6 +153,7 @@
(try
;; init run to open a ws
(m/? get-ws-create-task)
(started-dfv true)
(->>
(let [event (m/?> mixed-flow)]
(case (:type event)
@ -167,7 +168,7 @@
(m/reduce {} nil)
(m/?))
(catch Cancelled e
(add-log-fn {:type ::cancelled})
(add-log-fn {:type :rtc/cancelled})
(throw e)))))}))
(def ^:private *rtc-loop-metadata
@ -177,27 +178,28 @@
:canceler nil}))
;;; ================ API ================
(defn rtc-start
(defn create-rtc-start-task
[repo token]
(if-let [conn (worker-state/get-datascript-conn repo)]
(if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
(let [user-uuid (:sub (worker-util/parse-jwt token))
config (worker-state/get-config repo)
date-formatter (common-config/get-date-formatter config)
{:keys [onstarted-task rtc-log-flow rtc-state-flow *rtc-auto-push? rtc-loop-task]}
(create-rtc-loop user-uuid graph-uuid repo conn date-formatter token)
canceler (rtc-loop-task #(prn :rtc-loop-task-succ %) #(prn :rtc-loop-stopped %))]
(onstarted-task
(fn [succ?]
(prn :start-succ? succ?)
(when succ?
(reset! *rtc-loop-metadata {:rtc-log-flow rtc-log-flow
:rtc-state-flow rtc-state-flow
:*rtc-auto-push? *rtc-auto-push?
:canceler canceler})))
#(prn :started-failed %)))
(throw r.ex/ex-local-not-rtc-graph))
(throw (ex-info "Not found db-conn" {:repo repo}))))
(m/sp
(if-let [conn (worker-state/get-datascript-conn repo)]
(if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
(let [user-uuid (:sub (worker-util/parse-jwt token))
config (worker-state/get-config repo)
date-formatter (common-config/get-date-formatter config)
{:keys [onstarted-task rtc-log-flow rtc-state-flow *rtc-auto-push? rtc-loop-task]}
(create-rtc-loop user-uuid graph-uuid repo conn date-formatter token)
canceler (rtc-loop-task #(prn :rtc-loop-task-succ %) #(prn :rtc-loop-stopped %))
start-ex (m/? onstarted-task)]
(if (:ex-data start-ex)
(r.ex/->map start-ex)
(do (reset! *rtc-loop-metadata {:rtc-log-flow rtc-log-flow
:rtc-state-flow rtc-state-flow
:*rtc-auto-push? *rtc-auto-push?
:canceler canceler})
nil)))
(r.ex/->map r.ex/ex-local-not-rtc-graph))
(r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
:repo repo})))))
(defn rtc-stop
[]
@ -209,7 +211,6 @@
(when-let [*auto-push? (:*rtc-auto-push? @*rtc-loop-metadata)]
(swap! *auto-push? not)))
(defn create-get-graphs-task
[token]
(m/sp
@ -246,7 +247,7 @@
target-user-uuids (assoc :target-user-uuids target-user-uuids)
target-user-emails (assoc :target-user-emails target-user-emails)))))
(defn create-get-block-content-task
(defn create-get-block-content-versions-task
"Return a task that return map [:ex-data :ex-message :versions]"
[token graph-uuid block-uuid]
(let [get-ws-create-task (create-get-ws-create-task (get-ws-url token))]
@ -256,7 +257,6 @@
:graph-uuid graph-uuid})))
(comment
(do
(def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")

View File

@ -1,16 +1,43 @@
(ns frontend.worker.rtc.exception
"Exception list")
"Exception list"
(:require [frontend.schema-register :as sr]))
(sr/defkeyword :rtc.exception/remote-graph-not-exist
"Remote exception. e.g. push client-updates to a deleted graph.")
(sr/defkeyword :rtc.exception/remote-graph-not-ready
"Remote exception. Remote graph is still creating.")
(sr/defkeyword :rtc.exception/remote-graph-lock-missing
"Remote exception. Failed to remote graph lock isn't exist.
It's a server internal error, shouldn't happen.")
(sr/defkeyword :rtc.exception/not-rtc-graph
"Local exception. Trying to start rtc loop on a local-graph.")
(sr/defkeyword :rtc.exception/lock-failed
"Local exception.
Trying to start rtc loop but there's already one running, need to cancel that one first.")
(sr/defkeyword :rtc.exception/not-found-db-conn
"Local exception. Cannot find db-conn by repo")
(def ex-remote-graph-not-exist
(ex-info "remote graph not exist" {:type ::remote-graph-not-exist}))
(ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist}))
(def ex-remote-graph-not-ready
(ex-info "remote graph still creating" {:type ::remote-graph-not-ready}))
(ex-info "remote graph still creating" {:type :rtc.exception/remote-graph-not-ready}))
(def ex-remote-graph-lock-missing
(ex-info "remote graph lock missing(server error)" {:type ::remote-graph-lock-missing}))
(ex-info "remote graph lock missing(server internal error)"
{:type :rtc.exception/remote-graph-lock-missing}))
(def ex-local-not-rtc-graph
(ex-info "RTC is not supported for this graph" {:type ::not-rtc-graph}))
(ex-info "RTC is not supported for this local-graph" {:type :rtc.exception/not-rtc-graph}))
(defn ->map
[e]
(when-let [data (ex-data e)]
{:ex-data data
:ex-message (ex-message e)}))