From b0673fc850323384734113552a1fa4365a2b7f66 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Sun, 5 May 2024 18:20:00 +0800 Subject: [PATCH] refactor(rtc): remake async upload-graph --- .../src/logseq/common/missionary_util.cljs | 10 +++- src/main/frontend/db/rtc/debug_ui.cljs | 16 +++--- src/main/frontend/db_worker.cljs | 20 ++----- src/main/frontend/handler/db_based/rtc.cljs | 2 +- src/main/frontend/worker/rtc/client.cljs | 49 ++++++++--------- src/main/frontend/worker/rtc/core2.cljs | 54 ++++++++++--------- src/main/frontend/worker/rtc/exception.cljs | 6 +++ .../rtc/full_upload_download_graph.cljs | 36 +++++++++++-- src/main/frontend/worker/rtc/ws2.cljs | 32 +---------- 9 files changed, 114 insertions(+), 111 deletions(-) diff --git a/deps/common/src/logseq/common/missionary_util.cljs b/deps/common/src/logseq/common/missionary_util.cljs index edd84f803..ce0f28e35 100644 --- a/deps/common/src/logseq/common/missionary_util.cljs +++ b/deps/common/src/logseq/common/missionary_util.cljs @@ -55,7 +55,7 @@ (defn run-task "Return the canceler" [task key & {:keys [succ fail]}] - (task (or succ #(prn key :succ %)) (or fail #(js/console.log key (or (some-> % .-stack) %))))) + (task (or succ #(prn key :succ %)) (or fail #(js/console.log key %)))) (defn >! "Return a task that @@ -67,3 +67,11 @@ "Return a task that takes from given channel, completing with value when take is accepted, or nil if port was closed." [c] (doto (m/dfv) (->> (a/take! c)))) + + +(defn await-promise + "Returns a task completing with the result of given promise" + [p] + (let [v (m/dfv)] + (.then p #(v (fn [] %)) #(v (fn [] (throw %)))) + (m/absolve v))) diff --git a/src/main/frontend/db/rtc/debug_ui.cljs b/src/main/frontend/db/rtc/debug_ui.cljs index 76985c0c6..0fb5972f3 100644 --- a/src/main/frontend/db/rtc/debug_ui.cljs +++ b/src/main/frontend/db/rtc/debug_ui.cljs @@ -193,7 +193,7 @@ token (state/get-auth-id-token) remote-graph-name (:upload-as-graph-name state) ^js worker @db-browser/*worker] - (.rtc-async-upload-graph worker repo token remote-graph-name)))}) + (.rtc-async-upload-graph2 worker repo token remote-graph-name)))}) [:b "➡️"] [:input.form-input.my-2.py-1.w-32 {:on-change (fn [e] (swap! debug-state assoc :upload-as-graph-name (util/evalue e))) @@ -206,15 +206,11 @@ (ui/button (str "delete graph") {:icon "trash" :on-click (fn [] - (-> (shui/dialog-confirm! - {:title [:p.flex.flex-col.gap-1 - [:b "Are you sure delete current graph?"] - [:small.line-through.opacity-80 (state/get-current-repo)]]}) - (p/then #((when-let [graph-uuid (:graph-uuid-to-delete state)] - (let [token (state/get-auth-id-token) - ^object worker @db-browser/*worker] - (prn ::delete-graph graph-uuid) - (.rtc-delete-graph2 worker token graph-uuid)))))))}) + (when-let [graph-uuid (:graph-uuid-to-delete state)] + (let [token (state/get-auth-id-token) + ^object worker @db-browser/*worker] + (prn ::delete-graph graph-uuid) + (.rtc-delete-graph2 worker token graph-uuid))))}) (shui/select {:on-value-change (fn [v] diff --git a/src/main/frontend/db_worker.cljs b/src/main/frontend/db_worker.cljs index 06e7c186d..8dbf24dd8 100644 --- a/src/main/frontend/db_worker.cljs +++ b/src/main/frontend/db_worker.cljs @@ -603,24 +603,12 @@ (with-write-transit-str (js/Promise. (rtc-core2/new-task--get-debug-state)))) - ;; ================================================================ - (rtc-async-upload-graph + (rtc-async-upload-graph2 [this repo token remote-graph-name] - (let [d (p/deferred)] - (when-let [conn (worker-state/get-datascript-conn repo)] - (async/go - (try - (let [state (p diff --git a/src/main/frontend/handler/db_based/rtc.cljs b/src/main/frontend/handler/db_based/rtc.cljs index e70240f88..412ab2d9e 100644 --- a/src/main/frontend/handler/db_based/rtc.cljs +++ b/src/main/frontend/handler/db_based/rtc.cljs @@ -16,7 +16,7 @@ (user-handler/bool + (let [*sent (atom {}) ;; ws->bool ] (m/sp - (let [mws (m/? get-mws-create-task)] - (when-not (contains? @*sent mws) - (swap! *sent assoc mws false)) - (when (not (@*sent mws)) + (let [ws (m/? get-ws-create-task)] + (when-not (contains? @*sent ws) + (swap! *sent assoc ws false)) + (when (not (@*sent ws)) (m/? (c.m/backoff (take 5 c.m/delays) ;retry 5 times (32s) if remote-graph is creating - (register-graph-updates get-mws-create-task graph-uuid))) - (swap! *sent assoc mws true)) - mws)))) + (register-graph-updates get-ws-create-task graph-uuid))) + (swap! *sent assoc ws true)) + ws)))) (def ensure-register-graph-updates (memoize ensure-register-graph-updates*)) - (defn- remove-non-exist-block-uuids-in-add-retract-map [conn add-retract-map] (let [{:keys [add retract]} add-retract-map @@ -305,15 +306,15 @@ (defn new-task--push-local-ops "Return a task: push local updates" - [repo conn user-uuid graph-uuid date-formatter get-mws-create-task add-log-fn] + [repo conn user-uuid graph-uuid date-formatter get-ws-create-task add-log-fn] (m/sp (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder (sort-remote-ops (gen-block-uuid->remote-ops repo conn user-uuid)))] (op-mem-layer/new-branch! repo) (let [local-tx (op-mem-layer/get-local-tx repo) - r (m/? (send&recv get-mws-create-task {:action "apply-ops" :graph-uuid graph-uuid - :ops ops-for-remote :t-before (or local-tx 1)}))] + r (m/? (send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid + :ops ops-for-remote :t-before (or local-tx 1)}))] (if-let [remote-ex (:ex-data r)] (do (add-log-fn remote-ex) (case (:type remote-ex) @@ -329,8 +330,8 @@ :graph-lock-missing (do (op-mem-layer/rollback! repo) (throw r.ex/ex-remote-graph-lock-missing)) - ;; TODO: support read s3-obj when websocket return specific data - :get-s3-object-failed + + :rtc.exception/get-s3-object-failed (do (op-mem-layer/rollback! repo) nil) ;; else diff --git a/src/main/frontend/worker/rtc/core2.cljs b/src/main/frontend/worker/rtc/core2.cljs index f84a3a023..adeaed736 100644 --- a/src/main/frontend/worker/rtc/core2.cljs +++ b/src/main/frontend/worker/rtc/core2.cljs @@ -1,8 +1,8 @@ (ns frontend.worker.rtc.core2 "Main(use missionary) ns for rtc related fns" (:require [frontend.worker.rtc.client :as r.client] - [frontend.worker.rtc.const :as rtc-const] [frontend.worker.rtc.exception :as r.ex] + [frontend.worker.rtc.full-upload-download-graph :as r.upload-download] [frontend.worker.rtc.op-mem-layer :as op-mem-layer] [frontend.worker.rtc.remote-update :as r.remote-update] [frontend.worker.rtc.ws2 :as ws] @@ -67,7 +67,6 @@ (create-local-updates-check-flow repo *auto-push? 2000))] (c.m/mix remote-updates-flow local-updates-check-flow))) - (defn- new-task--get-ws-create "Return a map with atom *current-ws and a task that get current ws, create one if needed(closed or not created yet)" @@ -232,11 +231,11 @@ [token graph-uuid] (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))] (m/sp - (let [{:keys [ex-data]} - (m/? (r.client/send&recv get-ws-create-task - {:action "delete-graph" :graph-uuid graph-uuid}))] - (when ex-data (prn ::delete-graph-failed graph-uuid ex-data)) - (boolean (nil? ex-data)))))) + (let [{:keys [ex-data]} + (m/? (r.client/send&recv get-ws-create-task + {:action "delete-graph" :graph-uuid graph-uuid}))] + (when ex-data (prn ::delete-graph-failed graph-uuid ex-data)) + (boolean (nil? ex-data)))))) (defn new-task--get-user-info "Return a task that return users-info about the graph." @@ -268,20 +267,20 @@ [] (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)] (m/ap - (let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push?]} - (m/?< rtc-loop-metadata-flow)] - (try - (when (and repo rtc-state-flow *rtc-auto-push?) - (m/?< - (m/latest - (fn [rtc-state rtc-auto-push?] - {:graph-uuid graph-uuid - :user-uuid user-uuid - :unpushed-block-update-count (op-mem-layer/get-unpushed-block-update-count repo) - :rtc-state rtc-state - :auto-push? rtc-auto-push?}) - rtc-state-flow (m/watch *rtc-auto-push?)))) - (catch Cancelled _)))))) + (let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push?]} + (m/?< rtc-loop-metadata-flow)] + (try + (when (and repo rtc-state-flow *rtc-auto-push?) + (m/?< + (m/latest + (fn [rtc-state rtc-auto-push?] + {:graph-uuid graph-uuid + :user-uuid user-uuid + :unpushed-block-update-count (op-mem-layer/get-unpushed-block-update-count repo) + :rtc-state rtc-state + :auto-push? rtc-auto-push?}) + rtc-state-flow (m/watch *rtc-auto-push?)))) + (catch Cancelled _)))))) (defn new-task--get-debug-state [] @@ -300,6 +299,14 @@ (r.client/send&recv get-ws-create-task {:action "snapshot-list" :graph-uuid graph-uuid})))) +(defn new-task--upload-graph + [token repo remote-graph-name] + (m/sp + (if-let [conn (worker-state/get-datascript-conn repo)] + (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))] + (m/? (r.upload-download/new-task--upload-graph get-ws-create-task repo conn remote-graph-name))) + (r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn + :repo repo}))))) ;;; subscribe debug state ;;; @@ -319,7 +326,6 @@ (subscribe-debug-state) - (comment (do (def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072") @@ -335,6 +341,4 @@ (def rtc-log-flow rtc-log-flow) (def rtc-state-flow rtc-state-flow) (def *rtc-auto-push? *rtc-auto-push?))) - (cancel) - - ) + (cancel)) diff --git a/src/main/frontend/worker/rtc/exception.cljs b/src/main/frontend/worker/rtc/exception.cljs index a5c7939ff..7467255e7 100644 --- a/src/main/frontend/worker/rtc/exception.cljs +++ b/src/main/frontend/worker/rtc/exception.cljs @@ -22,6 +22,12 @@ Trying to start rtc loop but there's already one running, need to cancel that on (sr/defkeyword :rtc.exception/not-found-db-conn "Local exception. Cannot find db-conn by repo") +(sr/defkeyword :rtc.exception/get-s3-object-failed + "Failed to fetch response from s3. +When response from remote is too huge(> 32KB), +the server will put it to s3 and return its presigned-url to clients.") + + (def ex-remote-graph-not-exist (ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist})) diff --git a/src/main/frontend/worker/rtc/full_upload_download_graph.cljs b/src/main/frontend/worker/rtc/full_upload_download_graph.cljs index 1b3cf1a05..71ef2dfc8 100644 --- a/src/main/frontend/worker/rtc/full_upload_download_graph.cljs +++ b/src/main/frontend/worker/rtc/full_upload_download_graph.cljs @@ -9,17 +9,20 @@ [cognitect.transit :as transit] [datascript.core :as d] [frontend.worker.async-util :include-macros true :refer [clj (js/JSON.parse body) :keywordize-keys true) {:req-id req-id :ex-message "get s3 object failed" - :ex-data {:type :get-s3-object-failed :status status :body body}})) + :ex-data {:type :rtc.exception/get-s3-object-failed :status status :body body}})) resp))))