refactor(rtc): add apis

pull/11293/head
rcmerci 2024-04-30 18:05:18 +08:00
parent 956d470610
commit f642fc675d
3 changed files with 79 additions and 17 deletions

View File

@ -17,6 +17,7 @@
[frontend.worker.handler.page :as worker-page]
[frontend.worker.handler.page.rename :as worker-page-rename]
[frontend.worker.rtc.core :as rtc-core]
[frontend.worker.rtc.core2 :as rtc-core2]
[frontend.worker.rtc.db-listener]
[frontend.worker.rtc.full-upload-download-graph :as rtc-updown]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
@ -552,6 +553,18 @@
(ldb/write-transit-str (worker-export/get-all-page->content repo @conn))))
;; RTC
(rtc-start2
[this repo token]
(rtc-core2/rtc-start repo token))
(rtc-stop2
[this]
(rtc-core2/rtc-stop))
(rtc-toggle-sync2
[this]
(rtc-core2/rtc-toggle-sync))
(rtc-start
[this repo token dev-mode?]
(async-util/c->p

View File

@ -2,12 +2,16 @@
"Main(use missionary) ns for rtc related fns"
(:require [frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.rtc.exception :as r.ex]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.ws2 :as ws]
[frontend.worker.state :as worker-state]
[frontend.worker.util :as worker-util]
[goog.string :as gstring]
[logseq.common.config :as common-config]
[logseq.common.missionary-util :as c.m]
[logseq.db :as ldb]
[malli.core :as ma]
[missionary.core :as m])
(:import [missionary Cancelled]))
@ -52,13 +56,6 @@
(filter (fn [v] (when (pos? (op-mem-layer/get-unpushed-block-update-count repo)) v)))
merge-flow)))
(comment
(def *push (atom true))
(def f (create-local-updates-check-flow *push 2000))
(def cancel ((m/reduce (fn [_ v] (prn :v v) v) f) #(js/console.log :s %) #(js/console.log :f %)))
(reset! *push not)
(cancel))
(defn- create-mixed-flow
"Return a flow that emits all kinds of events:
`:remote-update`: remote-updates data from server
@ -112,35 +109,42 @@
"Use this fn to prevent multiple rtc-loops at same time.
rtc-loop-task is stateless, but conn is not.
we need to ensure that no two concurrent rtc-loop-tasks are modifying `conn` at the same time"
[task]
[started-dfv task]
(m/sp
(when-not (compare-and-set! *rtc-lock nil true)
(throw (ex-info "Must not run multiple rtc-loops, try later" {:missionary/retry true})))
(started-dfv false)
(throw (ex-info "Must not run multiple rtc-loops, try later"
{:type ::lock-failed
:missionary/retry true})))
(started-dfv true)
(try
(m/? task)
(finally
(compare-and-set! *rtc-lock true nil)))))
(defn create-rtc-loop
"Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push?]
(defn- create-rtc-loop
"Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
TODO: auto refresh token if needed"
[user-uuid graph-uuid repo conn date-formatter token
& {:keys [auto-push? debug-ws-url] :or {auto-push? true}}]
(let [ws-url (or debug-ws-url (get-ws-url token))
*auto-push? (atom auto-push?)
*log (atom nil)
started-dfv (m/dfv)
add-log-fn #(reset! *log [(js/Date.) %])
[*current-mws get-mws-create-task] (create-get-mws-create-task ws-url)
get-mws-create-task (r.client/ensure-register-graph-updates get-mws-create-task graph-uuid)
mixed-flow (create-mixed-flow repo get-mws-create-task *auto-push?)]
{:rtc-log-flow (m/buffer 100 (m/watch *log))
:rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws))
{:rtc-log-flow (m/buffer 100 (m/watch *log))
:rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws))
:*rtc-auto-push? *auto-push?
:onstarted-task started-dfv
:rtc-loop-task
(holding-rtc-lock
started-dfv
(m/sp
(try
;; init run to open a ws
;; init run to open a ws
(m/? get-mws-create-task)
(->>
(let [event (m/?> mixed-flow)]
@ -155,10 +159,50 @@
(m/ap)
(m/reduce {} nil)
(m/?))
(catch Cancelled _
(add-log-fn {:type ::cancelled})))))}))
(catch Cancelled e
(add-log-fn {:type ::cancelled})
(throw e)))))}))
(def ^:private *rtc-loop-metadata
(atom {:rtc-log-flow nil
:rtc-state-flow nil
:*rtc-auto-push? nil
:canceler nil}))
;;; ================ API ================
(defn rtc-start
[repo token]
(if-let [conn (worker-state/get-datascript-conn repo)]
(if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
(let [user-uuid (:sub (worker-util/parse-jwt token))
config (worker-state/get-config repo)
date-formatter (common-config/get-date-formatter config)
{:keys [onstarted-task rtc-log-flow rtc-state-flow *rtc-auto-push? rtc-loop-task]}
(create-rtc-loop user-uuid graph-uuid repo conn date-formatter token)
canceler (rtc-loop-task #(prn :rtc-loop-task-succ %) #(prn :rtc-loop-stopped %))]
(onstarted-task
(fn [succ?]
(prn :start-succ? succ?)
(when succ?
(reset! *rtc-loop-metadata {:rtc-log-flow rtc-log-flow
:rtc-state-flow rtc-state-flow
:*rtc-auto-push? *rtc-auto-push?
:canceler canceler})))
#(prn :started-failed %)))
(throw r.ex/ex-local-not-rtc-graph))
(throw (ex-info "Not found db-conn" {:repo repo}))))
(defn rtc-stop
[]
(when-let [canceler (:canceler @*rtc-loop-metadata)]
(canceler)))
(defn rtc-toggle-sync
[]
(when-let [*auto-push? (:*rtc-auto-push? @*rtc-loop-metadata)]
(swap! *auto-push? not)))
(def send&recv r.client/send&recv)
(comment
(do

View File

@ -9,3 +9,8 @@
(def ex-remote-graph-lock-missing
(ex-info "remote graph lock missing(server error)" {:type ::remote-graph-lock-missing}))
(def ex-local-not-rtc-graph
(ex-info "RTC is not supported for this graph" {:type ::not-rtc-graph}))