enhance(rtc): add :local-tx :remote-tx for rtc-state map

experiment/tanstack-table
rcmerci 2024-06-21 13:00:12 +08:00
parent b0c03b0003
commit 420fe6edc9
6 changed files with 90 additions and 34 deletions

View File

@ -19,6 +19,7 @@
(atom {:pending-local-ops 0
:graph-uuid nil
:local-tx nil
:remote-tx nil
:rtc-state :open
:download-logs nil
:upload-logs nil
@ -47,6 +48,7 @@
:pending-local-ops (:unpushed-block-update-count state)
:graph-uuid (:graph-uuid state)
:local-tx (:local-tx state)
:remote-tx (:remote-tx state)
:rtc-state (if (:rtc-lock state) :open :close)))
rtc-flows/rtc-state-flow))
::update-detail-info)]
@ -57,7 +59,8 @@
(rum/local false ::expand-debug-info?)
[state online?]
(let [*expand-debug? (::expand-debug-info? state)
{:keys [graph-uuid local-tx rtc-state download-logs upload-logs misc-logs pending-local-ops pending-server-ops]} (rum/react *detail-info)]
{:keys [graph-uuid local-tx remote-tx rtc-state
download-logs upload-logs misc-logs pending-local-ops pending-server-ops]} (rum/react *detail-info)]
[:div.rtc-info.flex.flex-col.gap-1.p-2.text-gray-11
[:div.font-medium.mb-2 (if online? "Online" "Offline")]
[:div [:span.font-medium.mr-1 pending-local-ops] "pending local changes"]
@ -79,6 +82,7 @@
misc-logs (assoc :misc misc-logs)
graph-uuid (assoc :graph-uuid graph-uuid)
local-tx (assoc :local-tx local-tx)
remote-tx (assoc :remote-tx remote-tx)
rtc-state (assoc :rtc-state rtc-state))
(fipp/pprint {:width 20})
with-out-str)]])]))

View File

@ -6,6 +6,7 @@
[frontend.common.missionary-util :as c.m]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.rtc.exception :as r.ex]
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.skeleton :as r.skeleton]
@ -17,11 +18,12 @@
[get-ws-create-task graph-uuid repo]
(m/sp
(try
(let [{:keys [t]}
(let [{remote-t :t}
(m/? (ws-util/send&recv get-ws-create-task {:action "register-graph-updates"
:graph-uuid graph-uuid}))]
(rtc-log-and-state/update-remote-t graph-uuid remote-t)
(when-not (op-mem-layer/get-local-tx repo)
(op-mem-layer/update-local-tx! repo t)))
(op-mem-layer/update-local-tx! repo remote-t)))
(catch :default e
(if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e)))
(throw (ex-info "remote graph is still creating" {:missionary/retry true} e))
@ -367,7 +369,7 @@
(do (assert (pos? (:t r)) r)
(op-mem-layer/commit! repo)
(r.remote-update/apply-remote-update
repo conn date-formatter {:type :remote-update :value r} add-log-fn)
graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
(add-log-fn :rtc.log/push-local-update {:remote-t (:t r)})))))
(op-mem-layer/rollback! repo))))
@ -387,6 +389,6 @@
(throw (ex-info "Unavailable" {:remote-ex remote-ex}))))
(do (assert (pos? (:t r)) r)
(r.remote-update/apply-remote-update
repo conn date-formatter {:type :remote-update :value r} add-log-fn)
graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
(add-log-fn :rtc.log/push-local-update {:sub-type :pull-remote-data
:remote-t (:t r) :local-t local-tx}))))))

View File

@ -4,7 +4,7 @@
[frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.exception :as r.ex]
[frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
[frontend.worker.rtc.log :as rtc-log]
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.skeleton]
@ -146,7 +146,7 @@
started-dfv (m/dfv)
add-log-fn (fn [type message]
(assert (map? message) message)
(rtc-log/rtc-log type (assoc message :graph-uuid graph-uuid)))
(rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
{:keys [*current-ws get-ws-create-task]}
(new-task--get-ws-create--memoized ws-url)
get-ws-create-task (r.client/ensure-register-graph-updates
@ -169,7 +169,7 @@
(let [event (m/?> mixed-flow)]
(case (:type event)
:remote-update
(try (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn)
(try (r.remote-update/apply-remote-update graph-uuid repo conn date-formatter event add-log-fn)
(catch :default e
(when (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
(m/? (r.client/new-task--pull-remote-data
@ -291,17 +291,20 @@
(when (and repo rtc-state-flow *rtc-auto-push? *rtc-lock)
(m/?<
(m/latest
(fn [rtc-state rtc-auto-push? rtc-lock online-users pending-local-ops-count]
(fn [rtc-state rtc-auto-push? rtc-lock online-users pending-local-ops-count local-tx remote-tx]
{:graph-uuid graph-uuid
:user-uuid user-uuid
:unpushed-block-update-count pending-local-ops-count
:local-tx (op-mem-layer/get-local-tx repo)
:local-tx local-tx
:remote-tx remote-tx
:rtc-state rtc-state
:rtc-lock rtc-lock
:auto-push? rtc-auto-push?
:online-users online-users})
rtc-state-flow (m/watch *rtc-auto-push?) (m/watch *rtc-lock) (m/watch *online-users)
(op-mem-layer/create-pending-ops-count-flow repo))))
(op-mem-layer/create-pending-ops-count-flow repo)
(rtc-log-and-state/create-local-t-flow graph-uuid)
(rtc-log-and-state/create-remote-t-flow graph-uuid))))
(catch Cancelled _))))))
(defn new-task--get-debug-state

View File

@ -5,7 +5,7 @@
[clojure.set :as set]
[datascript.core :as d]
[frontend.common.missionary-util :as c.m]
[frontend.worker.rtc.log :as rtc-log]
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.ws-util :as ws-util]
[frontend.worker.state :as worker-state]
@ -110,7 +110,7 @@
(defn new-task--upload-graph
[get-ws-create-task repo conn remote-graph-name]
(m/sp
(rtc-log/rtc-log :rtc.log/upload {:sub-type :fetch-presigned-put-url
(rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :fetch-presigned-put-url
:message "fetching presigned put-url"})
(let [[{:keys [url key]} all-blocks-str]
(m/?
@ -120,10 +120,10 @@
(m/sp
(let [all-blocks (export-as-blocks @conn)]
(ldb/write-transit-str all-blocks)))))]
(rtc-log/rtc-log :rtc.log/upload {:sub-type :upload-data
(rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-data
:message "uploading data"})
(m/? (c.m/<! (http/put url {:body all-blocks-str :with-credentials? false})))
(rtc-log/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
(rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
:message "requesting upload-graph"})
(let [upload-resp
(m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
@ -138,7 +138,7 @@
(op-mem-layer/init-empty-ops-store! repo)
(op-mem-layer/update-graph-uuid! repo graph-uuid)
(m/? (op-mem-layer/new-task--sync-to-idb repo))
(rtc-log/rtc-log :rtc.log/upload {:sub-type :upload-completed
(rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
:message "upload-graph completed"})
nil)
(throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
@ -245,18 +245,20 @@
^js worker-obj (:worker/object @worker-state/*state)]
(m/sp
(op-mem-layer/update-local-tx! repo t)
(rtc-log-and-state/update-local-t graph-uuid t)
(rtc-log-and-state/update-remote-t graph-uuid t)
(m/?
(c.m/await-promise
(p/do!
(.createOrOpenDB worker-obj repo (ldb/write-transit-str {:close-other-db? false}))
(.exportDB worker-obj repo)
(.transact worker-obj repo init-tx-data {:rtc-download-graph? true
:gen-undo-ops? false
:persist-op? false} (worker-state/get-context))
(.transact worker-obj repo tx-data {:rtc-download-graph? true
:gen-undo-ops? false
:persist-op? false} (worker-state/get-context))
(transact-block-refs! repo))))
(.createOrOpenDB worker-obj repo (ldb/write-transit-str {:close-other-db? false}))
(.exportDB worker-obj repo)
(.transact worker-obj repo init-tx-data {:rtc-download-graph? true
:gen-undo-ops? false
:persist-op? false} (worker-state/get-context))
(.transact worker-obj repo tx-data {:rtc-download-graph? true
:gen-undo-ops? false
:persist-op? false} (worker-state/get-context))
(transact-block-refs! repo))))
(worker-util/post-message :add-repo {:repo repo}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -265,7 +267,7 @@
(defn new-task--request-download-graph
[get-ws-create-task graph-uuid]
(rtc-log/rtc-log :rtc.log/download {:sub-type :request-download-graph
(rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :request-download-graph
:message "requesting download graph"
:graph-uuid graph-uuid})
(m/join :download-info-uuid
@ -282,7 +284,7 @@
[get-ws-create-task download-info-uuid graph-uuid timeout-ms]
(->
(m/sp
(rtc-log/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
(rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
:message "waiting for the remote to prepare the data"
:graph-uuid graph-uuid})
(loop []
@ -304,7 +306,7 @@
(defn new-task--download-graph-from-s3
[graph-uuid graph-name s3-url]
(m/sp
(rtc-log/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
(rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
:message "downloading graph data"
:graph-uuid graph-uuid})
(let [^js worker-obj (:worker/object @worker-state/*state)
@ -313,7 +315,7 @@
(if (not= 200 status)
(throw (ex-info "download-graph from s3 failed" {:resp r}))
(do
(rtc-log/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
(rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
:message "transacting graph data to local db"
:graph-uuid graph-uuid})
(let [all-blocks (ldb/read-transit-str body)]
@ -324,7 +326,7 @@
(m/? (op-mem-layer/new-task--sync-to-idb repo))
(m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
(worker-state/set-rtc-downloading-graph! false)
(rtc-log/rtc-log :rtc.log/download {:sub-type :download-completed
(rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed
:message "download completed"
:graph-uuid graph-uuid})
nil))))))

View File

@ -1,4 +1,4 @@
(ns frontend.worker.rtc.log
(ns frontend.worker.rtc.log-and-state
"Fns to generate rtc related logs"
(:require [frontend.common.missionary-util :as c.m]
[frontend.schema-register :as sr]
@ -32,6 +32,48 @@
(def rtc-log-flow (m/watch *rtc-log))
;;; some other states
(def ^:private graph-uuid->t-schema
[:map-of :uuid :int])
(def ^:private graph-uuid->t-validator (ma/validator graph-uuid->t-schema))
(def ^:private graph-uuid->t-validator* (fn [v] (if (graph-uuid->t-validator v)
true
(do (prn :debug-graph-uuid->t-validator v)
false))))
(def *graph-uuid->local-t (atom {} :validator graph-uuid->t-validator*))
(def *graph-uuid->remote-t (atom {} :validator graph-uuid->t-validator*))
(defn- ensure-uuid
[v]
(cond
(uuid? v) v
(string? v) (uuid v)
:else (throw (ex-info "illegal value" {:data v}))))
(defn create-local-t-flow
[graph-uuid]
(m/eduction
(map (fn [m] (get m (ensure-uuid graph-uuid))))
(m/watch *graph-uuid->local-t)))
(defn create-remote-t-flow
[graph-uuid]
{:pre [(some? graph-uuid)]}
(m/eduction
(map (fn [m] (get m (ensure-uuid graph-uuid))))
(m/watch *graph-uuid->remote-t)))
(defn update-local-t
[graph-uuid local-t]
(swap! *graph-uuid->local-t assoc (ensure-uuid graph-uuid) local-t))
(defn update-remote-t
[graph-uuid remote-t]
(swap! *graph-uuid->remote-t assoc (ensure-uuid graph-uuid) remote-t))
;;; subscribe-logs, push to frontend
(defonce ^:private *last-subscribe-logs-canceler (atom nil))
(defn- subscribe-logs

View File

@ -5,9 +5,9 @@
[clojure.string :as string]
[datascript.core :as d]
[frontend.schema-register :as sr]
[logseq.outliner.batch-tx :as batch-tx]
[frontend.worker.handler.page :as worker-page]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.state :as worker-state]
[frontend.worker.util :as worker-util]
@ -16,6 +16,7 @@
[logseq.db :as ldb]
[logseq.db.frontend.property.util :as db-property-util]
[logseq.graph-parser.whiteboard :as gp-whiteboard]
[logseq.outliner.batch-tx :as batch-tx]
[logseq.outliner.core :as outliner-core]
[logseq.outliner.transaction :as outliner-tx]))
@ -520,7 +521,7 @@
(defn apply-remote-update
"Apply remote-update(`remote-update-event`)"
[repo conn date-formatter remote-update-event add-log-fn]
[graph-uuid repo conn date-formatter remote-update-event add-log-fn]
(let [remote-update-data (:value remote-update-event)]
(assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data)
(let [remote-t (:t remote-update-data)
@ -563,7 +564,9 @@
(worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
(js/console.groupEnd)
(op-mem-layer/update-local-tx! repo remote-t))
(op-mem-layer/update-local-tx! repo remote-t)
(rtc-log-and-state/update-local-t graph-uuid remote-t)
(rtc-log-and-state/update-remote-t graph-uuid remote-t))
:else (throw (ex-info "unreachable" {:remote-t remote-t
:remote-t-before remote-t-before
:local-t local-tx}))))))