enhance(rtc): handle graph confict-update case

pull/10438/head
rcmerci 2023-09-10 01:56:36 +08:00
parent 36edf87db3
commit bc904aa8d7
5 changed files with 81 additions and 55 deletions

View File

@ -187,16 +187,16 @@
(defn apply-remote-move-ops
[state sorted-move-ops]
(prn :sorted-move-ops sorted-move-ops)
(doseq [{:keys [parents left self first-child sibling content]}
sorted-move-ops]
(case (check-block-pos state self parents left)
:not-exist
(insert-or-move-block state self parents left content false)
:wrong-pos
(insert-or-move-block state self parents left content true)
nil ; do nothing
nil)
(prn :apply-remote-move-ops self)))
(doseq [{:keys [parents left self content]} sorted-move-ops]
(let [r (check-block-pos state self parents left)]
(case r
:not-exist
(insert-or-move-block state self parents left content false)
:wrong-pos
(insert-or-move-block state self parents left content true)
nil ; do nothing
nil)
(prn :apply-remote-move-ops self r parents left))))
(defn apply-remote-update-ops
@ -204,7 +204,7 @@
{:pre [(some? @(:*repo state))]}
(let [repo @(:*repo state)]
(prn :update-ops update-ops)
(doseq [{:keys [parents left self first-child sibling content]}
(doseq [{:keys [parents left self content]}
update-ops]
(let [r (check-block-pos state self parents left)]
(case r
@ -248,29 +248,33 @@
{:pre [(data-from-ws-validator data-from-ws)
(some? @(:*repo state))]}
(go
(let [affected-blocks-map (update-keys (:affected-blocks data-from-ws) name)
(let [repo @(:*repo state)
affected-blocks-map (update-keys (:affected-blocks data-from-ws) name)
remote-t (:t data-from-ws)
{remove-ops-map "remove" move-ops-map "move" update-ops-map "update-attrs"
update-page-ops-map "update-page" remove-page-ops-map "remove-page"}
(update-vals
(group-by (fn [[_ env]] (get env :op)) affected-blocks-map)
(partial into {}))
remove-ops (vals remove-ops-map)
sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map)
update-ops (vals update-ops-map)
update-page-ops (vals update-page-ops-map)
remove-page-ops (vals remove-page-ops-map)]
(prn :start-apply-remote-update-page-ops)
(apply-remote-update-page-ops state update-page-ops)
(prn :start-apply-remote-remove-ops)
(apply-remote-remove-ops state remove-ops)
(prn :start-apply-remote-move-ops)
(apply-remote-move-ops state sorted-move-ops)
(prn :start-apply-remote-update-ops)
(apply-remote-update-ops state update-ops)
(prn :start-apply-remote-remove-page-ops)
(apply-remote-remove-page-ops state remove-page-ops)
(<! (p->c (op/<update-local-tx! @(:*repo state) remote-t))))))
local-t (<! (p->c (op/<get-ops&local-tx repo)))]
(if (<= remote-t local-t)
(prn :skip :remote-t remote-t :local-t local-t)
(let [{remove-ops-map "remove" move-ops-map "move" update-ops-map "update-attrs"
update-page-ops-map "update-page" remove-page-ops-map "remove-page"}
(update-vals
(group-by (fn [[_ env]] (get env :op)) affected-blocks-map)
(partial into {}))
remove-ops (vals remove-ops-map)
sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map)
update-ops (vals update-ops-map)
update-page-ops (vals update-page-ops-map)
remove-page-ops (vals remove-page-ops-map)]
(prn :start-apply-remote-update-page-ops)
(apply-remote-update-page-ops state update-page-ops)
(prn :start-apply-remote-remove-ops)
(apply-remote-remove-ops state remove-ops)
(prn :start-apply-remote-move-ops)
(apply-remote-move-ops state sorted-move-ops)
(prn :start-apply-remote-update-ops)
(apply-remote-update-ops state update-ops)
(prn :start-apply-remote-remove-page-ops)
(apply-remote-remove-page-ops state remove-page-ops)
(<! (p->c (op/<update-local-tx! repo remote-t))))))))
(defn- <push-data-from-ws-handler
[state push-data-from-ws]
@ -314,12 +318,8 @@
remove-page-uuids (conj remove-page-uuids block-uuid)]
(recur other-ops remove-block-uuids update-block-uuids move-block-uuids update-page-uuids remove-page-uuids))
(throw (ex-info "unknown op type" op)))))
{move-ops "move" remove-ops "remove" _update-ops "update"} (group-by first ops)
move-block-uuids (->> move-ops
(keep (fn [op]
(let [block-uuids (set (:block-uuids (second op)))]
(seq (set/intersection move-block-uuids-set block-uuids)))))
(apply concat))
{_move-ops "move" remove-ops "remove" _update-ops "update"} (group-by first ops)
move-block-uuids (seq move-block-uuids-set)
remove-block-uuids-groups (->> remove-ops
(keep (fn [op]
(let [block-uuids (set (:block-uuids (second op)))]
@ -381,9 +381,23 @@
:action "apply-ops" :graph-uuid @(:*graph-uuid state)
:ops ops-for-remote :t-before (or local-tx 1)}))
(<! (get-result-ch)))]
(<! (p->c (op/<clean-ops repo op-keys)))
(<! (<apply-remote-data state r))
(prn :<client-op-update-handler r))))
(if-let [remote-ex (:ex-data r)]
(case (:type remote-ex)
;; conflict-update remote-graph, keep these local-pending-ops
;; and try to send ops later
"graph-lock-failed"
(do (prn :graph-lock-failed)
nil)
;; this case means something wrong in remote-graph data,
;; nothing to do at client-side
"graph-lock-missing"
(do (prn :graph-lock-missing)
nil)
;; else
(throw (ex-info "Unavailable" {})))
(do (<! (p->c (op/<clean-ops repo op-keys)))
(<! (<apply-remote-data state r))
(prn :<client-op-update-handler r))))))
(defn <loop-for-rtc
[state graph-uuid repo]

View File

@ -13,9 +13,10 @@
[frontend.util :as util]
[frontend.handler.notification :as notification]))
(def ^:private debug-state (atom nil))
(defonce ^:private debug-state (atom nil))
(def debug-graph-uuid "6478874f-20a7-4335-9379-4cfb1cfa1b25")
(defn- <start
[]
(go

View File

@ -1,21 +1,23 @@
(ns frontend.db.rtc.macro)
(def ^:private magic-str "YBTFRD")
(defmacro with-sub-data-from-ws
"TODO: handle exceptions for ws
- sub :data-from-ws-pub
"- sub :data-from-ws-pub
- run body, use `get-req-id` to get req-id, and `get-result-ch` to get result-ch
- unsub :data-from-ws-pub"
[state & body]
`(let [~'req-id (str (random-uuid))
data-from-ws-pub# (:data-from-ws-pub ~state)
~'result-ch (cljs.core.async/chan 1)]
(cljs.core.async/sub data-from-ws-pub# ~'req-id ~'result-ch)
(try
~@body
(finally
(cljs.core.async/unsub data-from-ws-pub# ~'req-id ~'result-ch)))))
(let [req-id-sym (symbol (str magic-str "-req-id"))
result-ch-sym (symbol (str magic-str "-result-ch"))]
`(let [~req-id-sym (str (random-uuid))
data-from-ws-pub# (:data-from-ws-pub ~state)
~result-ch-sym (cljs.core.async/chan 1)]
(cljs.core.async/sub data-from-ws-pub# ~req-id-sym ~result-ch-sym)
(try
~@body
(finally
(cljs.core.async/unsub data-from-ws-pub# ~req-id-sym ~result-ch-sym))))))
(defmacro get-req-id [] 'req-id)
(defmacro get-result-ch [] 'result-ch)
(defmacro get-req-id [] (symbol (str magic-str "-req-id")))
(defmacro get-result-ch [] (symbol (str magic-str "-result-ch")))

View File

@ -56,3 +56,7 @@
(defn <get-graph-uuid
[repo]
(op-store/<get-graph-uuid repo))
(defn <get-local-tx
[repo]
(op-store/<get-local-tx repo))

View File

@ -66,3 +66,8 @@
[repo]
(p/let [store (ensure-store repo)]
(idb-keyval/get "graph-uuid" store)))
(defn <get-local-tx
[repo]
(p/let [store (ensure-store repo)]
(idb-keyval/get "local-tx" store)))