mirror of https://github.com/logseq/logseq
enhance(rtc.asset): adjust the call position of assets-sync-loop canceler
parent
06bce3dbd9
commit
98be881cff
|
@ -107,7 +107,11 @@
|
|||
|
||||
(def ^:private asset-change-event-schema
|
||||
[:map-of
|
||||
[:enum :download :upload]
|
||||
[:enum :download :upload
|
||||
;; Why don't need :delete event?
|
||||
;; when remove-block-op sync to server, server will know this asset need to be deleted
|
||||
;; :delete
|
||||
]
|
||||
[:sequential :uuid]])
|
||||
|
||||
(def ^:private asset-change-event-validator (ma/validator asset-change-event-schema))
|
||||
|
|
|
@ -135,25 +135,27 @@
|
|||
TODO: auto refresh token if needed"
|
||||
[graph-uuid repo conn date-formatter token
|
||||
& {:keys [auto-push? debug-ws-url] :or {auto-push? true}}]
|
||||
(let [ws-url (or debug-ws-url (ws-util/get-ws-url token))
|
||||
*auto-push? (atom auto-push?)
|
||||
*last-calibrate-t (atom nil)
|
||||
*online-users (atom nil)
|
||||
started-dfv (m/dfv)
|
||||
add-log-fn (fn [type message]
|
||||
(assert (map? message) message)
|
||||
(rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
|
||||
(let [ws-url (or debug-ws-url (ws-util/get-ws-url token))
|
||||
*auto-push? (atom auto-push?)
|
||||
*last-calibrate-t (atom nil)
|
||||
*online-users (atom nil)
|
||||
*assets-sync-loop-canceler (atom nil)
|
||||
started-dfv (m/dfv)
|
||||
add-log-fn (fn [type message]
|
||||
(assert (map? message) message)
|
||||
(rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
|
||||
{:keys [*current-ws get-ws-create-task]}
|
||||
(new-task--get-ws-create--memoized ws-url)
|
||||
get-ws-create-task (r.client/ensure-register-graph-updates
|
||||
get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
|
||||
mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
|
||||
get-ws-create-task (r.client/ensure-register-graph-updates
|
||||
get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
|
||||
{:keys [assets-sync-loop-task]}
|
||||
(r.asset/create-assets-sync-loop get-ws-create-task graph-uuid conn)
|
||||
mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
|
||||
(assert (some? *current-ws))
|
||||
{:get-ws-create-task get-ws-create-task
|
||||
:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
|
||||
:*rtc-auto-push? *auto-push?
|
||||
:*online-users *online-users
|
||||
:onstarted-task started-dfv
|
||||
{:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
|
||||
:*rtc-auto-push? *auto-push?
|
||||
:*online-users *online-users
|
||||
:onstarted-task started-dfv
|
||||
:rtc-loop-task
|
||||
(holding-rtc-lock
|
||||
started-dfv
|
||||
|
@ -162,6 +164,8 @@
|
|||
;; init run to open a ws
|
||||
(m/? get-ws-create-task)
|
||||
(started-dfv true)
|
||||
(reset! *assets-sync-loop-canceler
|
||||
(c.m/run-task assets-sync-loop-task :assets-sync-loop-task))
|
||||
(->>
|
||||
(let [event (m/?> mixed-flow)]
|
||||
(case (:type event)
|
||||
|
@ -183,6 +187,7 @@
|
|||
(m/reduce {} nil)
|
||||
(m/?))
|
||||
(catch Cancelled e
|
||||
(when @*assets-sync-loop-canceler (@*assets-sync-loop-canceler))
|
||||
(add-log-fn :rtc.log/cancelled {})
|
||||
(throw e)))))}))
|
||||
|
||||
|
@ -206,19 +211,12 @@
|
|||
(let [user-uuid (:sub (worker-util/parse-jwt token))
|
||||
config (worker-state/get-config repo)
|
||||
date-formatter (common-config/get-date-formatter config)
|
||||
{:keys [get-ws-create-task rtc-state-flow
|
||||
*rtc-auto-push? rtc-loop-task *online-users]
|
||||
onstarted-task1 :onstarted-task}
|
||||
{:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task *online-users]
|
||||
onstarted-task :onstarted-task}
|
||||
(create-rtc-loop graph-uuid repo conn date-formatter token)
|
||||
{:keys [assets-sync-loop-task]
|
||||
onstarted-task2 :onstarted-task}
|
||||
(r.asset/create-assets-sync-loop get-ws-create-task graph-uuid conn)
|
||||
canceler1 (c.m/run-task rtc-loop-task :rtc-loop-task)
|
||||
canceler2 (c.m/run-task assets-sync-loop-task :assets-sync-loop-task)
|
||||
canceler #(do (canceler1) (canceler2))
|
||||
start-ex1 (m/? onstarted-task1)
|
||||
start-ex2 (m/? onstarted-task2)]
|
||||
(if-let [start-ex (or (:ex-data start-ex1) (:ex-data start-ex2))]
|
||||
canceler (c.m/run-task rtc-loop-task :rtc-loop-task)
|
||||
start-ex (m/? onstarted-task)]
|
||||
(if-let [start-ex (:ex-data start-ex)]
|
||||
(r.ex/->map start-ex)
|
||||
(do (reset! *rtc-loop-metadata {:repo repo
|
||||
:graph-uuid graph-uuid
|
||||
|
|
Loading…
Reference in New Issue