fix: db requests could be dropped if some requests not finished yet

pull/10793/head
Tienson Qin 2024-01-14 12:55:55 +08:00
parent 74a76f0365
commit b55e4e7380
5 changed files with 87 additions and 71 deletions

View File

@ -59,30 +59,47 @@
(defn get-deferred-response
[request-id]
(get @*request-id->response request-id))
(:response (get @*request-id->response request-id)))
;; run the next request
(add-watch *request-id->response :loop-execute-requests
(fn [_ _ _ new]
(when-let [request-id (some->> (keys new)
sort
first)]
(when-let [callback (:callback (get new request-id))]
(callback)))))
(defn transact!
([conn tx-data]
(transact! conn tx-data nil))
([conn tx-data tx-meta]
(let [tx-data (common-util/fast-remove-nils tx-data)]
(let [tx-data (common-util/fast-remove-nils tx-data)
request-finished? (request-finished?)]
;; Ensure worker can handle the request sequentially (one by one)
;; Because UI assumes that the in-memory db has all the data except the last one transaction
(when (and (seq tx-data) (request-finished?))
(when (seq tx-data)
;; (prn :debug :transact)
;; (cljs.pprint/pprint tx-data)
(let [f (or @*transact-fn d/transact!)
sync? (= f d/transact!)
request-id (swap! *request-id inc)
tx-meta' (if sync? tx-meta
(assoc tx-meta :request-id request-id))
result (f conn tx-data tx-meta')]
(if sync? result
(let [resp (p/deferred)]
(swap! *request-id->response assoc request-id resp)
resp)))))))
request-id (when-not sync? (swap! *request-id inc))
tx-meta' (cond-> tx-meta
(not sync?)
(assoc :request-id request-id))]
(if sync?
(f conn tx-data tx-meta')
(let [resp (p/deferred)]
(when request-finished?
(f conn tx-data tx-meta'))
(let [value (if request-finished?
{:response resp}
{:response resp
:callback #(f conn tx-data tx-meta')})]
(swap! *request-id->response assoc request-id value))
resp)))))))
(defn build-default-pages-tx
[]

View File

@ -99,12 +99,12 @@ Options available:
blocks []
ast []}}
(cond (contains? common-config/mldoc-support-formats format)
(extract/extract file content extract-options')
(extract/extract file content extract-options')
(common-config/whiteboard? file)
(extract/extract-whiteboard-edn file content extract-options')
(common-config/whiteboard? file)
(extract/extract-whiteboard-edn file content extract-options')
:else nil)
:else nil)
block-ids (map (fn [block] {:block/uuid (:block/uuid block)}) blocks)
delete-blocks (delete-blocks-fn @conn (first pages) file block-ids)
block-refs-ids (->> (mapcat :block/refs blocks)
@ -121,13 +121,12 @@ Options available:
:ast ast})
tx (concat tx [(cond-> {:file/path file
:file/content content}
new?
new?
;; TODO: use file system timestamp?
(assoc :file/created-at (common-util/time-ms)))])
tx' (common-util/fast-remove-nils tx)
(assoc :file/created-at (common-util/time-ms)))])
result (if skip-db-transact?
tx'
(ldb/transact! conn tx' (select-keys options [:new-graph? :from-disk?])))]
tx
(ldb/transact! conn tx (select-keys options [:new-graph? :from-disk?])))]
{:tx result
:ast ast})))

View File

@ -2828,7 +2828,7 @@
(save-current-block!)
(remove-block-own-order-list-type! block))
(p/let [*edit-block-fn (atom nil)
_ (delete-block! repo false :*edit-block-fn *edit-block-fn)]
result (delete-block! repo false :*edit-block-fn *edit-block-fn)]
(when-let [f @*edit-block-fn]
(f))))))

View File

@ -952,13 +952,12 @@
;; db-worker -> UI
(defmethod handle :db/sync-changes [[_ {:keys [request-id] :as data}]]
(when request-id ; request-id could be nil sometimes
(let [repo (state/get-current-repo)]
(pipeline/invoke-hooks data)
(let [repo (state/get-current-repo)]
(pipeline/invoke-hooks data)
(ipc/ipc :db-transact repo (pr-str (:tx-data data)) (pr-str (:tx-meta data)))
(state/pub-event! [:search/transact-data repo (:search-indice data)])
nil)))
(ipc/ipc :db-transact repo (pr-str (:tx-data data)) (pr-str (:tx-meta data)))
(state/pub-event! [:search/transact-data repo (:search-indice data)])
nil))
(defn run!
[]

View File

@ -40,60 +40,61 @@
(defn invoke-hooks
[{:keys [request-id tx-meta tx-data deleted-block-uuids affected-keys blocks] :as opts}]
(when request-id
(let [{:keys [from-disk? new-graph? local-tx? undo? redo?]} tx-meta
repo (state/get-current-repo)
tx-report {:tx-meta tx-meta
:tx-data tx-data}]
;; (prn :debug :request-id request-id)
(let [{:keys [from-disk? new-graph? local-tx? undo? redo?]} tx-meta
repo (state/get-current-repo)
tx-report {:tx-meta tx-meta
:tx-data tx-data}]
(let [conn (db/get-db repo false)
tx-report (d/transact! conn tx-data tx-meta)]
(when local-tx?
(let [tx-id (get-tx-id tx-report)]
(store-undo-data! (assoc opts :tx-id tx-id))))
(when-not (or undo? redo?)
(update-current-tx-editor-cursor! tx-report)))
(let [conn (db/get-db repo false)
tx-report (d/transact! conn tx-data tx-meta)]
(when local-tx?
(let [tx-id (get-tx-id tx-report)]
(store-undo-data! (assoc opts :tx-id tx-id))))
(when-not (or undo? redo?)
(update-current-tx-editor-cursor! tx-report)))
(let [pages (set (keep #(when (= :block/name (:a %)) (:v %)) tx-data))]
(when (seq pages)
(mark-pages-as-loaded! repo pages)))
(let [pages (set (keep #(when (= :block/name (:a %)) (:v %)) tx-data))]
(when (seq pages)
(mark-pages-as-loaded! repo pages)))
(when (= (:outliner-op tx-meta) :delete-page)
(state/pub-event! [:page/deleted repo (:deleted-page tx-meta) (:file-path tx-meta)]))
(when (= (:outliner-op tx-meta) :delete-page)
(state/pub-event! [:page/deleted repo (:deleted-page tx-meta) (:file-path tx-meta)]))
(when (= (:outliner-op tx-meta) :rename-page)
(state/pub-event! [:page/renamed repo (:data tx-meta)]))
(when (= (:outliner-op tx-meta) :rename-page)
(state/pub-event! [:page/renamed repo (:data tx-meta)]))
(if (or from-disk? new-graph?)
(do
(react/clear-query-state!)
(ui-handler/re-render-root!))
(when-not (:graph/importing @state/state)
(react/refresh! repo tx-report affected-keys)
(if (or from-disk? new-graph?)
(do
(react/clear-query-state!)
(ui-handler/re-render-root!))
(when-not (:graph/importing @state/state)
(react/refresh! repo tx-report affected-keys)
(when-let [state (:ui/restore-cursor-state @state/state)]
(when (or undo? redo?)
(restore-cursor-and-app-state! state undo?)
(state/set-state! :ui/restore-cursor-state nil)))
(when-let [state (:ui/restore-cursor-state @state/state)]
(when (or undo? redo?)
(restore-cursor-and-app-state! state undo?)
(state/set-state! :ui/restore-cursor-state nil)))
(state/set-state! :editor/start-pos nil)
(state/set-state! :editor/start-pos nil)
(when (and state/lsp-enabled?
(seq blocks)
(<= (count blocks) 1000))
(state/pub-event! [:plugin/hook-db-tx
{:blocks blocks
:deleted-block-uuids deleted-block-uuids
:tx-data (:tx-data tx-report)
:tx-meta (:tx-meta tx-report)}]))))
(when (and state/lsp-enabled?
(seq blocks)
(<= (count blocks) 1000))
(state/pub-event! [:plugin/hook-db-tx
{:blocks blocks
:deleted-block-uuids deleted-block-uuids
:tx-data (:tx-data tx-report)
:tx-meta (:tx-meta tx-report)}]))))
(when-let [deleting-block-id (:ui/deleting-block @state/state)]
(when (some (fn [datom] (and
(= :block/uuid (:a datom))
(= (:v datom) deleting-block-id)
(true? (:added datom)))) tx-data) ; editing-block was added back (could be undo or from remote sync)
(state/set-state! :ui/deleting-block nil)))
(when-let [deleting-block-id (:ui/deleting-block @state/state)]
(when (some (fn [datom] (and
(= :block/uuid (:a datom))
(= (:v datom) deleting-block-id)
(true? (:added datom)))) tx-data) ; editing-block was added back (could be undo or from remote sync)
(state/set-state! :ui/deleting-block nil)))
(when request-id
(when-let [deferred (ldb/get-deferred-response request-id)]
(p/resolve! deferred {:tx-meta tx-meta
:tx-data tx-data})