enhance: replace sync/sync-events-publication with pubsub/sync-events-pub

pull/8366/head
rcmerci 2023-01-13 14:58:36 +08:00 committed by Andelf
parent 7cc3645095
commit 316dc6bc01
3 changed files with 9 additions and 40 deletions

View File

@ -1050,18 +1050,9 @@
;;; ### sync events
;; "`SyncManager` will put some internal sync events to this chan.
;; see also spec `::sync-event`"
(defonce ^:private sync-events-chan
(chan (async/sliding-buffer 1000)))
;; see also spec `::event` for topic list
(defonce sync-events-publication
(async/pub sync-events-chan :event))
(defn- put-sync-event!
[val]
{:pre [(s/valid? ::sync-event val)]}
(async/put! sync-events-chan val))
(async/put! pubsub/sync-events-ch val))
(def ^:private debug-print-sync-events-loop-stop-chan (chan 1))
(defn debug-print-sync-events-loop
@ -1079,7 +1070,7 @@
out-ch (chan 10)
out-mix (async/mix out-ch)]
(doseq [[topic ch] topic&chs]
(async/sub sync-events-publication topic ch)
(async/sub pubsub/sync-events-pub topic ch)
(async/admix out-mix ch))
(go-loop []
(let [{:keys [val stop]}
@ -1089,7 +1080,7 @@
(cond
stop (do (async/unmix-all out-mix)
(doseq [[topic ch] topic&chs]
(async/unsub sync-events-publication topic ch)))
(async/unsub pubsub/sync-events-pub topic ch)))
val (do (pp/pprint [:debug :sync-event val])
(recur))))))))
@ -1100,30 +1091,6 @@
(offer! debug-print-sync-events-loop-stop-chan true))
(comment
;; sub one type event example:
(def c1 (chan 10))
(async/sub sync-events-publication :created-local-version-file c1)
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
(poll! c1)
;; sub multiple type events example:
;; sub :created-local-version-file and :finished-remote->local events,
;; output into channel c4-out
(def c2 (chan 10))
(def c3 (chan 10))
(def c4-out (chan 10))
(def mix-out (async/mix c4-out))
(async/admix mix-out c2)
(async/admix mix-out c3)
(async/sub sync-events-publication :created-local-version-file c2)
(async/sub sync-events-publication :finished-remote->local c3)
(offer! sync-events-chan {:event :created-local-version-file :data :xxx})
(offer! sync-events-chan {:event :finished-remote->local :data :xxx})
(poll! c4-out)
(poll! c4-out)
)
;;; sync events ends
(defn- fire-file-sync-storage-exceed-limit-event!
@ -3316,7 +3283,7 @@
;; re-exec remote->local-full-sync when it failed before
(def re-remote->local-full-sync-chan (chan 1))
(async/sub sync-events-publication :remote->local-full-sync-failed re-remote->local-full-sync-chan)
(async/sub pubsub/sync-events-pub :remote->local-full-sync-failed re-remote->local-full-sync-chan)
(go-loop []
(let [{{graph-uuid :graph-uuid} :data} (<! re-remote->local-full-sync-chan)
{:keys [current-syncing-graph-uuid]}
@ -3327,7 +3294,7 @@
;; re-exec local->remote-full-sync when it failed
(def re-local->remote-full-sync-chan (chan 1))
(async/sub sync-events-publication :local->remote-full-sync-failed re-local->remote-full-sync-chan)
(async/sub pubsub/sync-events-pub :local->remote-full-sync-failed re-local->remote-full-sync-chan)
(go-loop []
(let [{{graph-uuid :graph-uuid} :data} (<! re-local->remote-full-sync-chan)
{:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)]

View File

@ -12,6 +12,7 @@
[frontend.state :as state]
[frontend.handler.user :as user]
[frontend.fs :as fs]
[frontend.pubsub :as pubsub]
[cljs-time.coerce :as tc]
[cljs-time.core :as t]
[frontend.storage :as storage]
@ -197,7 +198,7 @@
(defn setup-file-sync-event-listeners
[]
(let [c (async/chan 1)
p sync/sync-events-publication
p pubsub/sync-events-pub
topics [:finished-local->remote :finished-remote->local :start]]
(doseq [topic topics]
(async/sub p topic c))

View File

@ -72,4 +72,5 @@
:get-remote-graph-failed
:get-deletion-logs-failed]]
[:data :map]]
:topic-fn :event)
:topic-fn :event
:ch-buffer (a/sliding-buffer 10))