refactor(rtc): remake async upload-graph

pull/11293/head
rcmerci 2024-05-05 18:20:00 +08:00
parent 9bd1b41441
commit b0673fc850
9 changed files with 114 additions and 111 deletions

View File

@ -55,7 +55,7 @@
(defn run-task
"Return the canceler"
[task key & {:keys [succ fail]}]
(task (or succ #(prn key :succ %)) (or fail #(js/console.log key (or (some-> % .-stack) %)))))
(task (or succ #(prn key :succ %)) (or fail #(js/console.log key %))))
(defn >!
"Return a task that
@ -67,3 +67,11 @@
"Return a task that takes from given channel,
completing with value when take is accepted, or nil if port was closed."
[c] (doto (m/dfv) (->> (a/take! c))))
(defn await-promise
"Returns a task completing with the result of given promise"
[p]
(let [v (m/dfv)]
(.then p #(v (fn [] %)) #(v (fn [] (throw %))))
(m/absolve v)))

View File

@ -193,7 +193,7 @@
token (state/get-auth-id-token)
remote-graph-name (:upload-as-graph-name state)
^js worker @db-browser/*worker]
(.rtc-async-upload-graph worker repo token remote-graph-name)))})
(.rtc-async-upload-graph2 worker repo token remote-graph-name)))})
[:b "➡️"]
[:input.form-input.my-2.py-1.w-32
{:on-change (fn [e] (swap! debug-state assoc :upload-as-graph-name (util/evalue e)))
@ -206,15 +206,11 @@
(ui/button (str "delete graph")
{:icon "trash"
:on-click (fn []
(-> (shui/dialog-confirm!
{:title [:p.flex.flex-col.gap-1
[:b "Are you sure delete current graph?"]
[:small.line-through.opacity-80 (state/get-current-repo)]]})
(p/then #((when-let [graph-uuid (:graph-uuid-to-delete state)]
(let [token (state/get-auth-id-token)
^object worker @db-browser/*worker]
(prn ::delete-graph graph-uuid)
(.rtc-delete-graph2 worker token graph-uuid)))))))})
(when-let [graph-uuid (:graph-uuid-to-delete state)]
(let [token (state/get-auth-id-token)
^object worker @db-browser/*worker]
(prn ::delete-graph graph-uuid)
(.rtc-delete-graph2 worker token graph-uuid))))})
(shui/select
{:on-value-change (fn [v]

View File

@ -603,24 +603,12 @@
(with-write-transit-str
(js/Promise. (rtc-core2/new-task--get-debug-state))))
;; ================================================================
(rtc-async-upload-graph
(rtc-async-upload-graph2
[this repo token remote-graph-name]
(let [d (p/deferred)]
(when-let [conn (worker-state/get-datascript-conn repo)]
(async/go
(try
(let [state (<? (rtc-core/<init-state token false))
r (<? (rtc-updown/<async-upload-graph state repo conn remote-graph-name))]
(p/resolve! d r))
(catch :default e
(worker-util/post-message :notification
[[:div
[:p "upload graph failed"]]
:error])
(p/reject! d e)))))
d))
(with-write-transit-str
(js/Promise. (rtc-core2/new-task--upload-graph token repo remote-graph-name))))
;; ================================================================
(rtc-request-download-graph
[this token graph-uuid]
(async-util/c->p

View File

@ -16,7 +16,7 @@
(user-handler/<wrap-ensure-id&access-token
(let [token (state/get-auth-id-token)
repo-name (sqlite-common-db/sanitize-db-name repo)]
(.rtc-async-upload-graph worker repo token repo-name)))))
(.rtc-async-upload-graph2 worker repo token repo-name)))))
(defn <rtc-delete-graph!
[graph-uuid]

View File

@ -23,42 +23,43 @@
(defn send&recv
"Return a task: throw exception if recv ex-data response"
[get-mws-create-task message]
[get-ws-create-task message]
(m/sp
(let [mws (m/? get-mws-create-task)]
(handle-remote-ex (m/? (ws/send&recv mws message))))))
(let [ws (m/? get-ws-create-task)]
(handle-remote-ex (m/? (ws/send&recv ws message))))))
(defn- register-graph-updates
[get-mws-create-task graph-uuid]
[get-ws-create-task graph-uuid]
(m/sp
(let [{:keys [ex-data]}
(m/? (send&recv get-mws-create-task {:action "register-graph-updates"
:graph-uuid graph-uuid}))]
(when (= :graph-not-ready (:type ex-data))
(throw (ex-info "remote graph is still creating" {:missionary/retry true}))))))
(try
(m/? (send&recv get-ws-create-task {:action "register-graph-updates"
:graph-uuid graph-uuid}))
(catch :default e
(if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e)))
(throw (ex-info "remote graph is still creating" {:missionary/retry true} e))
(throw e))))))
(defn- ensure-register-graph-updates*
"Return a task: get or create a mws(missionary wrapped websocket).
see also `ws/get-mws-create`.
But ensure `register-graph-updates` has been sent"
[get-mws-create-task graph-uuid]
[get-ws-create-task graph-uuid]
(assert (some? graph-uuid))
(let [*sent (atom {}) ;; mws->bool
(let [*sent (atom {}) ;; ws->bool
]
(m/sp
(let [mws (m/? get-mws-create-task)]
(when-not (contains? @*sent mws)
(swap! *sent assoc mws false))
(when (not (@*sent mws))
(let [ws (m/? get-ws-create-task)]
(when-not (contains? @*sent ws)
(swap! *sent assoc ws false))
(when (not (@*sent ws))
(m/? (c.m/backoff
(take 5 c.m/delays) ;retry 5 times (32s) if remote-graph is creating
(register-graph-updates get-mws-create-task graph-uuid)))
(swap! *sent assoc mws true))
mws))))
(register-graph-updates get-ws-create-task graph-uuid)))
(swap! *sent assoc ws true))
ws))))
(def ensure-register-graph-updates (memoize ensure-register-graph-updates*))
(defn- remove-non-exist-block-uuids-in-add-retract-map
[conn add-retract-map]
(let [{:keys [add retract]} add-retract-map
@ -305,15 +306,15 @@
(defn new-task--push-local-ops
"Return a task: push local updates"
[repo conn user-uuid graph-uuid date-formatter get-mws-create-task add-log-fn]
[repo conn user-uuid graph-uuid date-formatter get-ws-create-task add-log-fn]
(m/sp
(when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
(sort-remote-ops
(gen-block-uuid->remote-ops repo conn user-uuid)))]
(op-mem-layer/new-branch! repo)
(let [local-tx (op-mem-layer/get-local-tx repo)
r (m/? (send&recv get-mws-create-task {:action "apply-ops" :graph-uuid graph-uuid
:ops ops-for-remote :t-before (or local-tx 1)}))]
r (m/? (send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
:ops ops-for-remote :t-before (or local-tx 1)}))]
(if-let [remote-ex (:ex-data r)]
(do (add-log-fn remote-ex)
(case (:type remote-ex)
@ -329,8 +330,8 @@
:graph-lock-missing
(do (op-mem-layer/rollback! repo)
(throw r.ex/ex-remote-graph-lock-missing))
;; TODO: support read s3-obj when websocket return specific data
:get-s3-object-failed
:rtc.exception/get-s3-object-failed
(do (op-mem-layer/rollback! repo)
nil)
;; else

View File

@ -1,8 +1,8 @@
(ns frontend.worker.rtc.core2
"Main(use missionary) ns for rtc related fns"
(:require [frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.const :as rtc-const]
[frontend.worker.rtc.exception :as r.ex]
[frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.remote-update :as r.remote-update]
[frontend.worker.rtc.ws2 :as ws]
@ -67,7 +67,6 @@
(create-local-updates-check-flow repo *auto-push? 2000))]
(c.m/mix remote-updates-flow local-updates-check-flow)))
(defn- new-task--get-ws-create
"Return a map with atom *current-ws and a task
that get current ws, create one if needed(closed or not created yet)"
@ -232,11 +231,11 @@
[token graph-uuid]
(let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
(m/sp
(let [{:keys [ex-data]}
(m/? (r.client/send&recv get-ws-create-task
{:action "delete-graph" :graph-uuid graph-uuid}))]
(when ex-data (prn ::delete-graph-failed graph-uuid ex-data))
(boolean (nil? ex-data))))))
(let [{:keys [ex-data]}
(m/? (r.client/send&recv get-ws-create-task
{:action "delete-graph" :graph-uuid graph-uuid}))]
(when ex-data (prn ::delete-graph-failed graph-uuid ex-data))
(boolean (nil? ex-data))))))
(defn new-task--get-user-info
"Return a task that return users-info about the graph."
@ -268,20 +267,20 @@
[]
(let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
(m/ap
(let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push?]}
(m/?< rtc-loop-metadata-flow)]
(try
(when (and repo rtc-state-flow *rtc-auto-push?)
(m/?<
(m/latest
(fn [rtc-state rtc-auto-push?]
{:graph-uuid graph-uuid
:user-uuid user-uuid
:unpushed-block-update-count (op-mem-layer/get-unpushed-block-update-count repo)
:rtc-state rtc-state
:auto-push? rtc-auto-push?})
rtc-state-flow (m/watch *rtc-auto-push?))))
(catch Cancelled _))))))
(let [{:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push?]}
(m/?< rtc-loop-metadata-flow)]
(try
(when (and repo rtc-state-flow *rtc-auto-push?)
(m/?<
(m/latest
(fn [rtc-state rtc-auto-push?]
{:graph-uuid graph-uuid
:user-uuid user-uuid
:unpushed-block-update-count (op-mem-layer/get-unpushed-block-update-count repo)
:rtc-state rtc-state
:auto-push? rtc-auto-push?})
rtc-state-flow (m/watch *rtc-auto-push?))))
(catch Cancelled _))))))
(defn new-task--get-debug-state
[]
@ -300,6 +299,14 @@
(r.client/send&recv get-ws-create-task {:action "snapshot-list"
:graph-uuid graph-uuid}))))
(defn new-task--upload-graph
[token repo remote-graph-name]
(m/sp
(if-let [conn (worker-state/get-datascript-conn repo)]
(let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
(m/? (r.upload-download/new-task--upload-graph get-ws-create-task repo conn remote-graph-name)))
(r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
:repo repo})))))
;;; subscribe debug state ;;;
@ -319,7 +326,6 @@
(subscribe-debug-state)
(comment
(do
(def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
@ -335,6 +341,4 @@
(def rtc-log-flow rtc-log-flow)
(def rtc-state-flow rtc-state-flow)
(def *rtc-auto-push? *rtc-auto-push?)))
(cancel)
)
(cancel))

View File

@ -22,6 +22,12 @@ Trying to start rtc loop but there's already one running, need to cancel that on
(sr/defkeyword :rtc.exception/not-found-db-conn
"Local exception. Cannot find db-conn by repo")
(sr/defkeyword :rtc.exception/get-s3-object-failed
"Failed to fetch response from s3.
When response from remote is too huge(> 32KB),
the server will put it to s3 and return its presigned-url to clients.")
(def ex-remote-graph-not-exist
(ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist}))

View File

@ -9,17 +9,20 @@
[cognitect.transit :as transit]
[datascript.core :as d]
[frontend.worker.async-util :include-macros true :refer [<? go-try]]
[frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.ws :as ws :refer [<send!]]
[frontend.worker.state :as worker-state]
[frontend.worker.util :as worker-util]
[logseq.common.missionary-util :as c.m]
[logseq.common.util.page-ref :as page-ref]
[logseq.db.frontend.content :as db-content]
[logseq.db.frontend.schema :as db-schema]
[logseq.outliner.core :as outliner-core]
[missionary.core :as m]
[promesa.core :as p]))
(def transit-r (transit/reader :json))
(def ^:private transit-r (transit/reader :json))
(defn- export-as-blocks
[db]
@ -39,6 +42,35 @@
{:db/id (:e (first datoms))}
datoms)))))))
(defn new-task--upload-graph
[get-ws-create-task repo conn remote-graph-name]
(m/sp
(let [[{:keys [url key]} all-blocks-str]
(m/?
(m/join
vector
(r.client/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
(m/sp
(let [all-blocks (export-as-blocks @conn)]
(transit/write (transit/writer :json) all-blocks)))))]
(m/? (c.m/<! (http/put url {:body all-blocks-str})))
(let [upload-resp
(m/? (r.client/send&recv get-ws-create-task {:action "upload-graph"
:s3-key key
:graph-name remote-graph-name}))]
(if-let [graph-uuid (:graph-uuid upload-resp)]
(let [^js worker-obj (:worker/object @worker-state/*state)]
(d/transact! conn
[{:db/ident :logseq.kv/graph-uuid :graph/uuid graph-uuid}
{:db/ident :logseq.kv/graph-local-tx :graph/local-tx "0"}])
(m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:graph/uuid graph-uuid}))))
(op-mem-layer/init-empty-ops-store! repo)
(op-mem-layer/update-graph-uuid! repo graph-uuid)
(op-mem-layer/update-local-tx! repo 8)
(m/? (c.m/<! (op-mem-layer/<sync-to-idb-layer! repo)))
nil)
(throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
(defn <async-upload-graph
[state repo conn remote-graph-name]
(go
@ -169,7 +201,6 @@
(worker-util/post-message :add-repo {:repo repo}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;
;; async download-graph ;;
;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -182,7 +213,6 @@
:graph-uuid graph-uuid}))]
download-info-uuid)))
(defn <wait-download-info-ready
[state download-info-uuid graph-uuid timeout-ms]
(let [init-interval 1000

View File

@ -96,36 +96,6 @@
(throw (ex-info "failed to open websocket conn"
{:missionary/retry true}
e)))))))
(comment
(defn get-mws-create
"Returns a task to get a mws(missionary-websocket), creating one if needed.
Always try to produce NOT-closed websocket.
When failed to open websocket, retry with backoff.
TODO: retry ASAP once network condition changed"
[url & {:keys [retry-count open-ws-timeout]
:or {retry-count 10 open-ws-timeout 10000}}]
(assert (and (pos-int? retry-count)
(pos-int? open-ws-timeout))
[retry-count open-ws-timeout])
(let [*last-m-ws (atom nil)
backoff-create-ws-task
(c.m/backoff
(take retry-count c.m/delays)
(m/sp
(let [m-ws
(try
(m/? (m/timeout (create-mws* url) open-ws-timeout))
(catch js/CloseEvent e
(throw (ex-info "failed to open websocket conn"
{:missionary/retry true}
e))))]
(reset! *last-m-ws m-ws)
m-ws)))]
(m/sp
(let [m-ws @*last-m-ws]
(if (and m-ws (not (closed? m-ws)))
m-ws
(m/? backoff-create-ws-task)))))))
(defn create-mws-state-flow
[mws]
@ -209,5 +179,5 @@
(js->clj (js/JSON.parse body) :keywordize-keys true)
{:req-id req-id
:ex-message "get s3 object failed"
:ex-data {:type :get-s3-object-failed :status status :body body}}))
:ex-data {:type :rtc.exception/get-s3-object-failed :status status :body body}}))
resp))))