feat(rtc.asset): add asset-sync-loop into rtc-sync-loop

feat/asset-sync
rcmerci 2024-07-07 20:54:55 +08:00
parent fa14834071
commit b8c8a1be40
4 changed files with 21 additions and 17 deletions

View File

@ -117,7 +117,7 @@
(defonce ^:private global-asset-change-event-flow
(m/buffer 20 (m/watch *global-asset-change-event)))
(defn- create-assets-sync-loop
(defn create-assets-sync-loop
[get-ws-create-task graph-uuid conn]
(let [started-dfv (m/dfv)
asset-change-event-flow global-asset-change-event-flow

View File

@ -1,11 +1,10 @@
(ns frontend.worker.rtc.asset-db-listener
"Listen asset-block changes in db, generate asset-sync operations"
(:require [frontend.schema-register :as sr]
(:require [datascript.core :as d]
[frontend.schema-register :as sr]
[frontend.worker.db-listener :as db-listener]
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]
[frontend.worker.rtc.asset :as r.asset]
[datascript.core :as d]))
[frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
(defn entity-datoms=>action+asset-uuid
[db-after entity-datoms]
@ -26,18 +25,14 @@
(fn [action->asset-uuids [action asset-uuid]]
(update action->asset-uuids action (fnil conj #{}) asset-uuid))
{}))]
))
(reset! r.asset/*global-asset-change-event action->asset-uuids)))
(sr/defkeyword :generate-asset-change-events?
"tx-meta option, generate events to notify asset-sync (default true)")
(defmethod db-listener/listen-db-changes :gen-asset-change-events
[_ {:keys [_tx-data tx-meta db-before db-after
repo id->attr->datom e->a->add?->v->t same-entity-datoms-coll]}]
[_ {:keys [_tx-data tx-meta _db-before db-after
repo _id->attr->datom _e->a->add?->v->t same-entity-datoms-coll]}]
(when (and (op-mem-layer/rtc-db-graph? repo)
(:generate-asset-change-events? tx-meta true))
)
)
(generate-asset-change-events db-after same-entity-datoms-coll)))

View File

@ -1,6 +1,7 @@
(ns frontend.worker.rtc.core
"Main(use missionary) ns for rtc related fns"
(:require [frontend.common.missionary-util :as c.m]
[frontend.worker.rtc.asset :as r.asset]
[frontend.worker.rtc.client :as r.client]
[frontend.worker.rtc.exception :as r.ex]
[frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
@ -148,7 +149,8 @@
get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
(assert (some? *current-ws))
{:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
{:get-ws-create-task get-ws-create-task
:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
:*rtc-auto-push? *auto-push?
:*online-users *online-users
:onstarted-task started-dfv
@ -204,9 +206,14 @@
(let [user-uuid (:sub (worker-util/parse-jwt token))
config (worker-state/get-config repo)
date-formatter (common-config/get-date-formatter config)
{:keys [onstarted-task rtc-state-flow *rtc-auto-push? rtc-loop-task *online-users]}
{:keys [get-ws-create-task onstarted-task rtc-state-flow
*rtc-auto-push? rtc-loop-task *online-users]}
(create-rtc-loop graph-uuid repo conn date-formatter token)
canceler (c.m/run-task rtc-loop-task :rtc-loop-task)
{:keys [assets-sync-loop-task]}
(r.asset/create-assets-sync-loop get-ws-create-task graph-uuid conn)
canceler1 (c.m/run-task rtc-loop-task :rtc-loop-task)
canceler2 (c.m/run-task assets-sync-loop-task :assets-sync-loop-task)
canceler #(do (canceler1) (canceler2))
start-ex (m/? onstarted-task)]
(if (:ex-data start-ex)
(r.ex/->map start-ex)

View File

@ -19,7 +19,9 @@
:rtc.log/download
:rtc.log/cancelled
:rtc.log/apply-remote-update
:rtc.log/push-local-update])
:rtc.log/push-local-update
:rtc.asset.log/cancelled])
(def ^:private rtc-log-type-validator (ma/validator rtc-log-type-schema))