mirror of https://github.com/logseq/logseq
refactor(rtc): remove :counter in state
parent
dd03dbe473
commit
086af03127
|
@ -87,7 +87,6 @@
|
|||
[:toggle-auto-push-client-ops-chan :any]
|
||||
[:*auto-push-client-ops? :any]
|
||||
[:force-push-client-ops-chan :any]
|
||||
[:counter :any]
|
||||
[:dev-mode? :boolean]
|
||||
[:*block-update-log :any]])
|
||||
|
||||
|
@ -970,6 +969,8 @@
|
|||
(when-let [*rtc-state (:*rtc-state state)]
|
||||
(reset! *rtc-state :closed)))
|
||||
|
||||
(declare notify-main-thread)
|
||||
|
||||
(defn <loop-for-rtc
|
||||
":loop-started-ch used to notify that rtc-loop started"
|
||||
[state graph-uuid repo conn date-formatter & {:keys [loop-started-ch token]}]
|
||||
|
@ -980,10 +981,7 @@
|
|||
(reset! (:*repo state) repo)
|
||||
(reset! (:*db-conn state) conn)
|
||||
(reset! (:*date-formatter state) date-formatter)
|
||||
(add-watch (:*rtc-state state)
|
||||
:update-rtc-state
|
||||
(fn [_ _ _ _new]
|
||||
(swap! *state update :counter (fnil inc 0))))
|
||||
(add-watch (:*rtc-state state) :update-rtc-state #(notify-main-thread state))
|
||||
(reset! (:*rtc-state state) :open)
|
||||
(let [{:keys [data-from-ws-pub _client-op-update-chan]} state
|
||||
push-data-from-ws-ch (chan (async/sliding-buffer 100) (map rtc-const/data-from-ws-coercer))
|
||||
|
@ -1002,39 +1000,39 @@
|
|||
(when loop-started-ch (async/close! loop-started-ch))
|
||||
(<?
|
||||
(go-try
|
||||
(loop [push-client-ops-ch
|
||||
(make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))]
|
||||
(let [{:keys [push-data-from-ws client-op-update stop continue]}
|
||||
(async/alt!
|
||||
toggle-auto-push-client-ops-ch {:continue true}
|
||||
force-push-client-ops-ch {:client-op-update true}
|
||||
push-client-ops-ch ([v] (if (and @*auto-push-client-ops? (true? v))
|
||||
{:client-op-update true}
|
||||
{:continue true}))
|
||||
push-data-from-ws-ch ([v] {:push-data-from-ws v})
|
||||
stop-rtc-loop-chan {:stop true}
|
||||
:priority true)]
|
||||
(cond
|
||||
continue
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
|
||||
(loop [push-client-ops-ch
|
||||
(make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))]
|
||||
(let [{:keys [push-data-from-ws client-op-update stop continue]}
|
||||
(async/alt!
|
||||
toggle-auto-push-client-ops-ch {:continue true}
|
||||
force-push-client-ops-ch {:client-op-update true}
|
||||
push-client-ops-ch ([v] (if (and @*auto-push-client-ops? (true? v))
|
||||
{:client-op-update true}
|
||||
{:continue true}))
|
||||
push-data-from-ws-ch ([v] {:push-data-from-ws v})
|
||||
stop-rtc-loop-chan {:stop true}
|
||||
:priority true)]
|
||||
(cond
|
||||
continue
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
|
||||
|
||||
push-data-from-ws
|
||||
(let [r (<! (<push-data-from-ws-handler state repo conn date-formatter push-data-from-ws))]
|
||||
(when (= r ::need-pull-remote-data)
|
||||
push-data-from-ws
|
||||
(let [r (<! (<push-data-from-ws-handler state repo conn date-formatter push-data-from-ws))]
|
||||
(when (= r ::need-pull-remote-data)
|
||||
;; trigger a force push, which can pull remote-diff-data from local-t to remote-t
|
||||
(async/put! force-push-client-ops-ch true))
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
|
||||
(async/put! force-push-client-ops-ch true))
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
|
||||
|
||||
client-op-update
|
||||
client-op-update
|
||||
;; FIXME: access token expired
|
||||
(let [_ (<? (<client-op-update-handler state token))]
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
|
||||
(let [_ (<? (<client-op-update-handler state token))]
|
||||
(recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
|
||||
|
||||
stop
|
||||
(stop-rtc-helper state)
|
||||
stop
|
||||
(stop-rtc-helper state)
|
||||
|
||||
:else
|
||||
nil)))))
|
||||
:else
|
||||
nil)))))
|
||||
(async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch)
|
||||
(catch :default e
|
||||
(case (:type (ex-data e))
|
||||
|
@ -1097,7 +1095,6 @@
|
|||
:force-push-client-ops-chan (chan (async/sliding-buffer 1))
|
||||
:*ws (atom ws)
|
||||
;; used to trigger state watch
|
||||
:counter 0
|
||||
:dev-mode? dev-mode?
|
||||
:*block-update-log (atom {})})
|
||||
|
||||
|
@ -1145,7 +1142,7 @@
|
|||
dev-mode?)]
|
||||
(when reset-*state?
|
||||
(reset! *state state)
|
||||
(swap! *state update :counter inc))
|
||||
(notify-main-thread state))
|
||||
state))))
|
||||
|
||||
(defn <start-rtc
|
||||
|
@ -1218,17 +1215,18 @@
|
|||
|
||||
;;; APIs (ends)
|
||||
|
||||
(defn- notify-main-thread
|
||||
[state]
|
||||
(when-let [*repo (:*repo state)]
|
||||
(let [repo @*repo
|
||||
new-state (get-debug-state repo state)]
|
||||
(when (= :open (:rtc-state new-state))
|
||||
(worker-util/post-message :rtc-sync-state new-state)))))
|
||||
|
||||
(add-watch *state :notify-main-thread
|
||||
(fn [_ _ old new]
|
||||
(when-let [*repo (:*repo new)]
|
||||
(let [repo @*repo
|
||||
new-state (get-debug-state repo new)
|
||||
old-state (get-debug-state repo old)]
|
||||
(when (or (not= new-state old-state)
|
||||
(= :open (:rtc-state new-state)))
|
||||
(worker-util/post-message :rtc-sync-state new-state))))))
|
||||
(fn [_ _ _ new] (notify-main-thread new)))
|
||||
|
||||
(add-watch op-mem-layer/*ops-store :update-ops-state
|
||||
(fn [_ _ _ _new]
|
||||
(when (and *state (:*repo @*state))
|
||||
(swap! *state update :counter (fnil inc 0)))))
|
||||
(notify-main-thread @*state))))
|
||||
|
|
Loading…
Reference in New Issue