fix: use core.async channel to ensure sequential db transactions

pull/11102/head
Tienson Qin 2024-03-03 06:38:35 +08:00
parent 215e65a249
commit 33a532455b
6 changed files with 77 additions and 102 deletions

View File

@ -10,7 +10,9 @@
[clojure.set :as set]
[logseq.db.frontend.rules :as rules]
[logseq.db.frontend.entity-plus]
[promesa.core :as p]))
[promesa.core :as p]
[clojure.core.async :as async]
[clojure.core.async.interop :refer [p->c]]))
;; Use it as an input argument for datalog queries
(def block-attrs
@ -50,6 +52,7 @@
(when f (reset! *transact-fn f)))
(defonce *request-id (atom 0))
(defonce requests (async/chan 1000))
(defonce *request-id->response (atom {}))
(defn request-finished?
@ -57,40 +60,29 @@
[]
(empty? @*request-id->response))
(defn get-deferred-response
[request-id]
(:response (get @*request-id->response request-id)))
;; run the next request
(add-watch *request-id->response :loop-execute-requests
(fn [_ _ _ new]
(when-let [request-id (some->> (keys new)
sort
first)]
(when-let [callback (:callback (get new request-id))]
(callback)))))
(async/go-loop []
(when-let [{:keys [id request response]} (async/<! requests)]
(let [result (async/<! (p->c (request)))]
(p/resolve! response result)
(swap! *request-id->response dissoc id))
(recur)))
(defn get-next-request-id
[]
(swap! *request-id inc))
(defn add-request!
[request-id data]
(swap! *request-id->response assoc request-id (if (map? data) data {:response data})))
(defn transact!
"`repo-or-conn`: repo for UI thread and conn for worker/node"
([repo-or-conn tx-data]
(transact! repo-or-conn tx-data nil))
([repo-or-conn tx-data tx-meta]
(let [tx-data (->> (common-util/fast-remove-nils tx-data)
(remove empty?))
request-finished? (request-finished?)]
(remove empty?))]
;; Ensure worker can handle the request sequentially (one by one)
;; Because UI assumes that the in-memory db has all the data except the last one transaction
(when (seq tx-data)
;; (prn :debug :transact)
;; (prn :debug :transact :sync? (= d/transact! (or @*transact-fn d/transact!)))
;; (cljs.pprint/pprint tx-data)
(let [f (or @*transact-fn d/transact!)
@ -101,14 +93,11 @@
(assoc :request-id request-id))]
(if sync?
(f repo-or-conn tx-data tx-meta')
(let [resp (p/deferred)]
(when request-finished?
(f repo-or-conn tx-data tx-meta'))
(let [value (if request-finished?
{:response resp}
{:response resp
:callback #(f repo-or-conn tx-data tx-meta')})]
(add-request! request-id value))
(let [resp (p/deferred)
new-request {:id request-id
:request #(f repo-or-conn tx-data tx-meta')
:response resp}]
(async/go (async/>! requests new-request))
resp)))))))
(defn build-default-pages-tx

View File

@ -66,5 +66,3 @@
(conn/start! repo option)))
(def new-block-id ldb/new-block-id)
(def request-finished? ldb/request-finished?)

View File

@ -356,21 +356,14 @@
true
(dissoc :insert-blocks?)))]
(if (and (:create-today-journal? tx-meta)
(:today-journal-name tx-meta)
(seq tx-data)
(d/entity @conn [:block/name (:today-journal-name tx-meta)])) ; today journal created already
(when-not (and (:create-today-journal? tx-meta)
(:today-journal-name tx-meta)
(seq tx-data)
(d/entity @conn [:block/name (:today-journal-name tx-meta)])) ; today journal created already
;; remove task from ldb/*request-id->response
(worker-util/post-message :sync-db-changes (pr-str
{:request-id (:request-id tx-meta)
:repo repo
:tx-data []
:tx-meta nil}))
(do
;; (prn :debug :transact :tx-data tx-data :tx-meta tx-meta')
(worker-util/profile "Worker db transact"
(ldb/transact! conn tx-data tx-meta'))))
;; (prn :debug :transact :tx-data tx-data :tx-meta tx-meta')
(worker-util/profile "Worker db transact"
(ldb/transact! conn tx-data tx-meta')))
nil)
(catch :default e
(prn :debug :error)

View File

@ -37,7 +37,7 @@
(history/restore-app-state! app-state))
(defn invoke-hooks
[{:keys [request-id tx-meta tx-data deleted-block-uuids affected-keys blocks] :as opts}]
[{:keys [_request-id tx-meta tx-data deleted-block-uuids affected-keys blocks] :as opts}]
;; (prn :debug
;; :request-id request-id
;; :tx-meta tx-meta
@ -47,12 +47,21 @@
tx-report {:tx-meta tx-meta
:tx-data tx-data}
conn (db/get-db repo false)]
(if initial-pages?
(cond
initial-pages?
(do
(util/profile "transact initial-pages" (d/transact! conn tx-data tx-meta))
(when end?
(state/pub-event! [:init/commands])
(ui-handler/re-render-root!)))
(or from-disk? new-graph?)
(do
(d/transact! conn tx-data tx-meta)
(react/clear-query-state!)
(ui-handler/re-render-root!))
:else
(do
(let [tx-data' (if (= (:outliner-op tx-meta) :insert-blocks)
(let [update-blocks-fully-loaded (keep (fn [datom] (when (= :block/uuid (:a datom))
@ -67,35 +76,31 @@
(when-not (or undo? redo?)
(update-current-tx-editor-cursor! tx-report)))
(let [new-datoms (filter (fn [datom]
(and
(= :block/uuid (:a datom))
(true? (:added datom)))) tx-data)]
(when (seq new-datoms)
(state/set-state! :editor/new-created-blocks (set (map :v new-datoms)))))
(when-not (:graph/importing @state/state)
(let [new-datoms (filter (fn [datom]
(and
(= :block/uuid (:a datom))
(true? (:added datom)))) tx-data)]
(when (seq new-datoms)
(state/set-state! :editor/new-created-blocks (set (map :v new-datoms)))))
(if (or from-disk? new-graph?)
(do
(react/clear-query-state!)
(ui-handler/re-render-root!))
(when-not (:graph/importing @state/state)
(react/refresh! repo tx-report affected-keys)
(react/refresh! repo tx-report affected-keys)
(when-let [state (:ui/restore-cursor-state @state/state)]
(when (or undo? redo?)
(restore-cursor-and-app-state! state undo?)
(state/set-state! :ui/restore-cursor-state nil)))
(when-let [state (:ui/restore-cursor-state @state/state)]
(when (or undo? redo?)
(restore-cursor-and-app-state! state undo?)
(state/set-state! :ui/restore-cursor-state nil)))
(state/set-state! :editor/start-pos nil)
(state/set-state! :editor/start-pos nil)
(when (and state/lsp-enabled?
(seq blocks)
(<= (count blocks) 1000))
(state/pub-event! [:plugin/hook-db-tx
{:blocks blocks
:deleted-block-uuids deleted-block-uuids
:tx-data (:tx-data tx-report)
:tx-meta (:tx-meta tx-report)}]))))))
(when (and state/lsp-enabled?
(seq blocks)
(<= (count blocks) 1000))
(state/pub-event! [:plugin/hook-db-tx
{:blocks blocks
:deleted-block-uuids deleted-block-uuids
:tx-data (:tx-data tx-report)
:tx-meta (:tx-meta tx-report)}])))))
(when (= (:outliner-op tx-meta) :delete-page)
(state/pub-event! [:page/deleted repo (:deleted-page tx-meta) (:file-path tx-meta) tx-meta]))
@ -108,10 +113,4 @@
(= :block/uuid (:a datom))
(= (:v datom) deleting-block-id)
(true? (:added datom)))) tx-data) ; editing-block was added back (could be undo or from remote sync)
(state/set-state! :ui/deleting-block nil)))
(when request-id
(when-let [deferred (ldb/get-deferred-response request-id)]
(when (p/promise? deferred)
(p/resolve! deferred {:tx-meta tx-meta :tx-data tx-data})))
(swap! ldb/*request-id->response dissoc request-id))))
(state/set-state! :ui/deleting-block nil)))))

View File

@ -7,26 +7,23 @@
(defmacro transact!
[opts & body]
`(let [test?# frontend.util/node-test?]
(when (or test?# (db/request-finished?))
(when (nil? @(:history/tx-before-editor-cursor @state/state))
(state/set-state! :history/tx-before-editor-cursor (state/get-current-edit-block-and-position)))
(let [ops# frontend.modules.outliner.op/*outliner-ops*]
(if ops#
(do ~@body) ; nested transact!
(binding [frontend.modules.outliner.op/*outliner-ops* (transient [])]
~@body
(let [r# (persistent! frontend.modules.outliner.op/*outliner-ops*)
worker# @state/*db-worker]
(if (and test?# (seq r#))
(logseq.outliner.op/apply-ops! (state/get-current-repo)
(db/get-db false)
r#
(state/get-date-formatter)
~opts)
(when (and worker# (seq r#))
(let [request-id# (state/get-worker-next-request-id)
response# (.apply-outliner-ops ^Object worker# (state/get-current-repo)
(pr-str r#)
(pr-str (assoc ~opts :request-id request-id#)))]
(state/add-worker-request! request-id# :outliner-tx)
response#))))))))))
(when (nil? @(:history/tx-before-editor-cursor @state/state))
(state/set-state! :history/tx-before-editor-cursor (state/get-current-edit-block-and-position)))
(let [ops# frontend.modules.outliner.op/*outliner-ops*]
(if ops#
(do ~@body) ; nested transact!
(binding [frontend.modules.outliner.op/*outliner-ops* (transient [])]
~@body
(let [r# (persistent! frontend.modules.outliner.op/*outliner-ops*)
worker# @state/*db-worker]
(if (and test?# (seq r#))
(logseq.outliner.op/apply-ops! (state/get-current-repo)
(db/get-db false)
r#
(state/get-date-formatter)
~opts)
(when (and worker# (seq r#))
(let [request-id# (state/get-worker-next-request-id)]
(.apply-outliner-ops ^Object worker# (state/get-current-repo)
(pr-str r#)
(pr-str (assoc ~opts :request-id request-id#))))))))))))

View File

@ -2376,4 +2376,3 @@ Similar to re-frame subscriptions"
(update-state! :favorites/updated? inc))
(def get-worker-next-request-id ldb/get-next-request-id)
(def add-worker-request! ldb/add-request!)