diff --git a/src/main/frontend/worker/rtc/client.cljs b/src/main/frontend/worker/rtc/client.cljs index 9ab9285ed..dd0291bb5 100644 --- a/src/main/frontend/worker/rtc/client.cljs +++ b/src/main/frontend/worker/rtc/client.cljs @@ -24,8 +24,10 @@ "Return a task: throw exception if recv ex-data response" [get-mws-create-task message] (m/sp - (handle-remote-ex - (m/? (ws/send&recv get-mws-create-task message))))) + (let [mws (m/? get-mws-create-task)] + (handle-remote-ex + (rtc-const/data-from-ws-coercer + (m/? (ws/send&recv mws message))))))) (defn- register-graph-updates [get-mws-create-task graph-uuid] @@ -38,14 +40,15 @@ But ensure `register-graph-updates` has been sent" [get-mws-create-task graph-uuid] (assert (some? graph-uuid)) - (let [*sent (atom {})] + (let [*sent (atom {}) ;; mws->bool + ] (m/sp (let [mws (m/? get-mws-create-task)] - (when (contains? @*sent mws) - (swap! *sent mws false)) + (when-not (contains? @*sent mws) + (swap! *sent assoc mws false)) (when (not (@*sent mws)) - (m/? (register-graph-updates (m/sp mws) graph-uuid)) - (swap! *sent mws true)) + (m/? (register-graph-updates get-mws-create-task graph-uuid)) + (swap! *sent assoc mws true)) mws)))) (defn- remove-non-exist-block-uuids-in-add-retract-map @@ -328,5 +331,6 @@ (do (assert (pos? (:t r)) r) (op-mem-layer/commit! repo) - (r.remote-update/apply-remote-update repo conn date-formatter r add-log-fn) + (r.remote-update/apply-remote-update + repo conn date-formatter {:type :remote-update :value r} add-log-fn) (add-log-fn {:type ::push-client-updates :remote-t (:t r)}))))))) diff --git a/src/main/frontend/worker/rtc/core2.cljs b/src/main/frontend/worker/rtc/core2.cljs index beba2734b..beb51ede4 100644 --- a/src/main/frontend/worker/rtc/core2.cljs +++ b/src/main/frontend/worker/rtc/core2.cljs @@ -1,17 +1,20 @@ (ns frontend.worker.rtc.core2 "Main(use missionary) ns for rtc related fns" (:require [frontend.worker.rtc.client :as r.client] + [frontend.worker.rtc.const :as rtc-const] + [frontend.worker.rtc.op-mem-layer :as op-mem-layer] [frontend.worker.rtc.remote-update :as r.remote-update] [frontend.worker.rtc.ws2 :as ws] [frontend.worker.state :as worker-state] [goog.string :as gstring] [logseq.common.missionary-util :as c.m] [malli.core :as ma] - [missionary.core :as m])) + [missionary.core :as m]) + (:import [missionary Cancelled])) (def ^:private rtc-state-schema [:map - [:ws-state [:enum :connecting :open :closing :closed]]]) + [:ws-state {:optional true} [:enum :connecting :open :closing :closed]]]) (def ^:private rtc-state-validator (ma/validator rtc-state-schema)) (defn- get-ws-url @@ -28,7 +31,9 @@ (let [mws (m/? get-mws-create-task) x (try (m/?> (m/eduction - (filter (fn [data] (= "push-updates" (:req-id data)))) + (keep (fn [data] + (when (= "push-updates" (:req-id data)) + (rtc-const/data-from-ws-coercer data)))) (ws/recv-flow mws))) (catch js/CloseEvent _ sentinel))] @@ -37,13 +42,14 @@ (m/amb x (recur)))))))) (defn- create-local-updates-check-flow - "Return a flow" - [*auto-push? interval-ms] + "Return a flow: emit if need to push local-updates" + [repo *auto-push? interval-ms] (let [auto-push-flow (m/watch *auto-push?) clock-flow (c.m/clock interval-ms :clock) merge-flow (m/latest vector auto-push-flow clock-flow)] (m/eduction (filter first) (map second) + (filter (fn [v] (when (pos? (op-mem-layer/get-unpushed-block-update-count repo)) v))) merge-flow))) (comment @@ -57,13 +63,13 @@ "Return a flow that emits all kinds of events: `:remote-update`: remote-updates data from server `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote." - [get-mws-create-task *auto-push?] + [repo get-mws-create-task *auto-push?] (let [remote-updates-flow (m/eduction (map (fn [data] {:type :remote-update :value data})) (get-remote-updates get-mws-create-task)) local-updates-check-flow (m/eduction (map (fn [data] {:type :local-update-check :value data})) - (create-local-updates-check-flow *auto-push? 2000))] + (create-local-updates-check-flow repo *auto-push? 2000))] (c.m/mix remote-updates-flow local-updates-check-flow))) (defn- create-get-mws-create-task @@ -86,18 +92,20 @@ (defn- create-mws-state-flow [*current-mws] - (m/ap - (if-let [mws (m/?< (m/watch *current-mws))] - (m/?< (ws/create-mws-state-flow mws)) - (m/amb)))) + (m/relieve + (m/ap + (if-let [mws (m/?< (m/watch *current-mws))] + (m/?< (ws/create-mws-state-flow mws)) + (m/amb))))) (defn- create-rtc-state-flow [mws-state-flow] (m/latest (fn [mws-state] {:post [(rtc-state-validator %)]} - {:ws-state mws-state}) - mws-state-flow)) + (cond-> {} + mws-state (assoc :ws-state mws-state))) + (m/reductions {} nil mws-state-flow))) (defn create-rtc-loop "Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push?] @@ -110,26 +118,50 @@ add-log-fn #(reset! *log [(js/Date.) %]) [*current-mws get-mws-create-task] (create-get-mws-create-task ws-url) get-mws-create-task (r.client/ensure-register-graph-updates get-mws-create-task graph-uuid) - mixed-flow (create-mixed-flow get-mws-create-task *auto-push?)] + mixed-flow (create-mixed-flow repo get-mws-create-task *auto-push?)] {:rtc-log-flow (m/buffer 100 (m/watch *log)) :rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws)) :*rtc-auto-push? *auto-push? :rtc-loop-task (m/sp - ;; init run to open a ws - (m/? get-mws-create-task) - (->> - (let [event (m/?> mixed-flow)] - (case (:type event) - :remote-update - (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn) + (try + ;; init run to open a ws + (m/? get-mws-create-task) + (->> + (let [event (m/?> mixed-flow)] + (case (:type event) + :remote-update + (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn) - :local-update-check - (m/? (r.client/create-push-local-ops-task - repo conn user-uuid graph-uuid date-formatter - get-mws-create-task add-log-fn)))) - (m/ap) - (m/reduce {}) - (m/?)))})) + :local-update-check + (m/? (r.client/create-push-local-ops-task + repo conn user-uuid graph-uuid date-formatter + get-mws-create-task add-log-fn)))) + (m/ap) + (m/reduce {} nil) + (m/?)) + (catch Cancelled _ + (add-log-fn {:type ::cancelled}))))})) (def send&recv r.client/send&recv) + +(comment + (do + (def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072") + (def graph-uuid "ff7186c1-5903-4bc8-b4e9-ca23525b9983") + (def repo "logseq_db_4-23") + (def conn (worker-state/get-datascript-conn repo)) + (def date-formatter "MMM do, yyyy") + (def debug-ws-url "wss://ws-dev.logseq.com/rtc-sync?token=???") + (let [{:keys [rtc-log-flow rtc-state-flow *rtc-auto-push? rtc-loop-task]} + (create-rtc-loop user-uuid graph-uuid repo conn date-formatter nil {:debug-ws-url debug-ws-url}) + c (rtc-loop-task #(js/console.log :succ %) #(js/console.log :fail %))] + (def cancel c) + (def rtc-log-flow rtc-log-flow) + (def rtc-state-flow rtc-state-flow) + (def *rtc-auto-push? *rtc-auto-push?))) + (cancel) + (def cancel2 ((m/reduce (fn [_ v] (prn :v v) v) + (m/latest vector rtc-state-flow (m/reductions {} nil rtc-log-flow))) + #(js/console.log :succ %) #(js/console.log :fail %))) + (cancel2)) diff --git a/src/main/frontend/worker/rtc/ws2.cljs b/src/main/frontend/worker/rtc/ws2.cljs index d4cb1222b..668732be1 100644 --- a/src/main/frontend/worker/rtc/ws2.cljs +++ b/src/main/frontend/worker/rtc/ws2.cljs @@ -171,17 +171,22 @@ (:recv-flow m-ws))) (defn send&recv - "Return a task: send message wait to recv its response and return it" + "Return a task: send message wait to recv its response and return it. + Throw if timeout" [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}] (assert (pos-int? timeout-ms)) (let [req-id (str (random-uuid)) message (assoc message :req-id req-id)] (m/sp - (let [mws (m/? (send mws message))] - (m/? (m/timeout - (m/reduce - (fn [_ v] - (when (= req-id (:req-id v)) - (reduced v))) - (recv-flow mws)) - timeout-ms)))))) + (let [mws (m/? (send mws message)) + result (m/? + (m/timeout + (m/reduce + (fn [_ v] + (when (= req-id (:req-id v)) + (reduced v))) + (recv-flow mws)) + timeout-ms))] + (when-not result + (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true}))) + result))))