enhance(rtc): update online-users in debug-ui

experiment/tanstack-table
rcmerci 2024-06-18 22:05:12 +08:00
parent c3f30a9dc9
commit 8588e6d8b2
4 changed files with 34 additions and 44 deletions

View File

@ -27,7 +27,6 @@
[reitit.frontend.easy :as rfe]
[rum.core :as rum]
[clojure.string :as string]
[frontend.handler.db-based.rtc :as rtc-handler]
[frontend.db :as db]
[logseq.db :as ldb]))
@ -61,20 +60,6 @@
[:span.ml-2 (ui/loading "")])]])))
(rum/defc rtc-collaborators < rum/reactive
{:will-mount (fn [state]
(let [*interval (atom nil)]
(reset! *interval
(js/setInterval
(fn []
(if (= :open (:ws-state (:rtc-state @(:rtc/state @state/state))))
(rtc-handler/<rtc-get-users-info)
(when @*interval (js/clearInterval @*interval))))
5000))
(assoc state ::interval *interval)))
:will-unmount (fn [state]
(when-let [interval @(::interval state)]
(js/clearInterval interval))
state)}
[]
(let [rtc-graph-id (ldb/get-graph-rtc-uuid (db/get-db))
online-users (->> (get (state/sub :rtc/users-info) (state/get-current-repo))

View File

@ -48,7 +48,8 @@
(let [debug-state* (rum/react debug-state)
rtc-logs @(get state ::logs)
rtc-state (:rtc-state debug-state*)
rtc-lock (:rtc-lock debug-state*)]
rtc-lock (:rtc-lock debug-state*)
online-users (:online-users debug-state*)]
[:div
{:on-click (fn [^js e]
(when-let [^js btn (.closest (.-target e) ".ui__button")]
@ -82,17 +83,7 @@
:graph<->user-user-type
:graph<->user-grant-by-user])))
graph-list)))))}
(shui/tabler-icon "download") "graph-list")
(shui/button
{:size :sm
:on-click #(let [token (state/get-auth-id-token)
^object worker @db-browser/*worker]
(when-let [graph-uuid (:graph-uuid debug-state*)]
(p/let [result (.rtc-get-users-info2 worker token graph-uuid)
result* (ldb/read-transit-str result)]
(swap! debug-state assoc :online-info result*))))}
(shui/tabler-icon "users") "online-info")]
(shui/tabler-icon "download") "graph-list")]
[:div.pb-4
[:pre.select-text
@ -103,7 +94,7 @@
:local-tx (:local-tx debug-state*)
:pending-block-update-count (:unpushed-block-update-count debug-state*)
:remote-graphs (:remote-graphs debug-state*)
:online-info (:online-info debug-state*)
:online-users (:online-users debug-state*)
:auto-push? (:auto-push? debug-state*)
:current-page (state/get-current-page)
:blocks-count (when-let [page (state/get-current-page)]

View File

@ -9,6 +9,7 @@
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.skeleton :as r.skeleton]
[frontend.worker.rtc.ws :as ws]
[frontend.worker.rtc.ws-util :as ws-util]
[missionary.core :as m]))
@ -30,7 +31,7 @@
"Return a task: get or create a mws(missionary wrapped websocket).
see also `ws/get-mws-create`.
But ensure `register-graph-updates` and `calibrate-graph-skeleton` has been sent"
[get-ws-create-task graph-uuid repo conn *last-calibrate-t]
[get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users]
(assert (some? graph-uuid))
(let [*sent (atom {}) ;; ws->bool
]
@ -39,6 +40,21 @@
(when-not (contains? @*sent ws)
(swap! *sent assoc ws false))
(when (not (@*sent ws))
(let [recv-flow (ws/recv-flow (m/? get-ws-create-task))]
(c.m/run-task
(m/sp
(when-let [online-users (:online-users
(m/?
(m/timeout
(m/reduce
(fn [_ v]
(when (= "online-users-updated" (:req-id v))
(reduced v)))
recv-flow)
10000)))]
(reset! *online-users online-users)))
:update-online-user-when-register-graph-updates
:succ (constantly nil)))
(m/? (c.m/backoff
(take 5 (drop 2 c.m/delays)) ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
(register-graph-updates get-ws-create-task graph-uuid repo)))

View File

@ -152,7 +152,7 @@
{:keys [*current-ws get-ws-create-task]}
(new-task--get-ws-create--memoized ws-url)
get-ws-create-task (r.client/ensure-register-graph-updates
get-ws-create-task graph-uuid repo conn *last-calibrate-t)
get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
(assert (some? *current-ws))
{:rtc-log-flow (m/watch *log)
@ -184,7 +184,7 @@
get-ws-create-task add-log-fn))
:online-users-updated
(reset! *online-users (:value event))))
(reset! *online-users (:online-users (:value event)))))
(m/ap)
(m/reduce {} nil)
(m/?))
@ -287,30 +287,30 @@
:block-uuids [block-uuid]
:graph-uuid graph-uuid}))))
(defn- create-get-state-flow
[]
(def ^:private create-get-state-flow
(let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
(m/ap
(let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *rtc-lock rtc-log-flow]}
(let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *rtc-lock rtc-log-flow *online-users]}
(m/?< rtc-loop-metadata-flow)]
(try
(when (and repo rtc-state-flow *rtc-auto-push? *rtc-lock rtc-log-flow)
(m/?<
(m/latest
(fn [rtc-state rtc-auto-push? rtc-lock]
(fn [rtc-state rtc-auto-push? rtc-lock online-users]
{:graph-uuid graph-uuid
:user-uuid user-uuid
:unpushed-block-update-count (op-mem-layer/get-unpushed-block-update-count repo)
:local-tx (op-mem-layer/get-local-tx repo)
:rtc-state rtc-state
:rtc-lock rtc-lock
:auto-push? rtc-auto-push?})
rtc-state-flow (m/watch *rtc-auto-push?) (m/watch *rtc-lock))))
:auto-push? rtc-auto-push?
:online-users online-users})
rtc-state-flow (m/watch *rtc-auto-push?) (m/watch *rtc-lock) (m/watch *online-users))))
(catch Cancelled _))))))
(defn new-task--get-debug-state
[]
(m/reduce {} nil (m/eduction (take 1) (create-get-state-flow))))
(m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
(defn new-task--snapshot-graph
[token graph-uuid]
@ -365,7 +365,7 @@
(let [cancel (c.m/run-task
(m/reduce
(fn [_ v] (worker-util/post-message :rtc-sync-state v))
(create-get-state-flow))
create-get-state-flow)
:subscribe-state)]
(reset! *last-subscribe-canceler cancel)
nil))
@ -374,7 +374,7 @@
;;; subscribe rtc logs
(def global-rtc-log-flow
(def ^:private global-rtc-log-flow
(let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
(m/ap
(let [{:keys [rtc-log-flow]} (m/?< rtc-loop-metadata-flow)]
@ -413,6 +413,4 @@
(def rtc-log-flow rtc-log-flow)
(def rtc-state-flow rtc-state-flow)
(def *rtc-auto-push? *rtc-auto-push?)))
(cancel)
)
(cancel))