diff --git a/deps/common/src/logseq/common/missionary_util.cljs b/deps/common/src/logseq/common/missionary_util.cljs index 52108e208..edd84f803 100644 --- a/deps/common/src/logseq/common/missionary_util.cljs +++ b/deps/common/src/logseq/common/missionary_util.cljs @@ -1,7 +1,8 @@ (ns logseq.common.missionary-util "Utils based on missionary." - (:import [missionary Cancelled]) - (:require [missionary.core :as m])) + (:require [clojure.core.async :as a] + [missionary.core :as m]) + (:import [missionary Cancelled])) (def delays (reductions * 1000 (repeat 2))) @@ -55,3 +56,14 @@ "Return the canceler" [task key & {:keys [succ fail]}] (task (or succ #(prn key :succ %)) (or fail #(js/console.log key (or (some-> % .-stack) %))))) + +(defn >! + "Return a task that + puts given value on given channel, + completing with true when put is accepted, or false if port was closed." + [c x] (doto (m/dfv) (->> (a/put! c x)))) + +(defn > (a/take! c)))) diff --git a/src/main/frontend/worker/rtc/core2.cljs b/src/main/frontend/worker/rtc/core2.cljs index 729ef2a0e..f84a3a023 100644 --- a/src/main/frontend/worker/rtc/core2.cljs +++ b/src/main/frontend/worker/rtc/core2.cljs @@ -337,7 +337,4 @@ (def *rtc-auto-push? *rtc-auto-push?))) (cancel) - - - ) diff --git a/src/main/frontend/worker/rtc/ws2.cljs b/src/main/frontend/worker/rtc/ws2.cljs index 69b6f5ce0..fcbb9f4c0 100644 --- a/src/main/frontend/worker/rtc/ws2.cljs +++ b/src/main/frontend/worker/rtc/ws2.cljs @@ -2,7 +2,8 @@ "Websocket wrapped by missionary. based on https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs" - (:require [frontend.worker.rtc.const :as rtc-const] + (:require [cljs-http.client :as http] + [frontend.worker.rtc.const :as rtc-const] [logseq.common.missionary-util :as c.m] [missionary.core :as m])) @@ -89,12 +90,12 @@ (c.m/backoff (take retry-count c.m/delays) (m/sp - (try - (m/? (m/timeout (create-mws* url) open-ws-timeout)) - (catch js/CloseEvent e - (throw (ex-info "failed to open websocket conn" - {:missionary/retry true} - e))))))) + (try + (m/? (m/timeout (create-mws* url) open-ws-timeout)) + (catch js/CloseEvent e + (throw (ex-info "failed to open websocket conn" + {:missionary/retry true} + e))))))) (comment (defn get-mws-create "Returns a task to get a mws(missionary-websocket), creating one if needed. @@ -171,23 +172,42 @@ (map rtc-const/data-from-ws-coercer) (:recv-flow m-ws))) -(defn send&recv +(defn- send&recv* "Return a task: send message wait to recv its response and return it. Throw if timeout" [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}] - (assert (pos-int? timeout-ms)) - (let [req-id (str (random-uuid)) - message (assoc message :req-id req-id)] - (m/sp - (let [mws (m/? (send mws message)) - result (m/? - (m/timeout - (m/reduce - (fn [_ v] - (when (= req-id (:req-id v)) - (reduced v))) - (recv-flow mws)) - timeout-ms))] - (when-not result - (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true}))) - result)))) + {:pre [(pos-int? timeout-ms) + (some? (:req-id message))]} + (m/sp + (let [mws (m/? (send mws message)) + req-id (:req-id message) + result (m/? + (m/timeout + (m/reduce + (fn [_ v] + (when (= req-id (:req-id v)) + (reduced v))) + (recv-flow mws)) + timeout-ms))] + (when-not result + (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true}))) + result))) + +(defn send&recv + "Return a task that send the message then wait to recv its response, + if the response has kv `:s3-presign-url`, fetch the response from s3, + else just return the response. + Throw if timeout" + [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}] + (m/sp + (let [req-id (str (random-uuid)) + message (assoc message :req-id req-id) + resp (m/? (send&recv* mws message :timeout-ms timeout-ms))] + (if-let [s3-presign-url (:s3-presign-url resp)] + (let [{:keys [status body]} (m/? (c.m/clj (js/JSON.parse body) :keywordize-keys true) + {:req-id req-id + :ex-message "get s3 object failed" + :ex-data {:type :get-s3-object-failed :status status :body body}})) + resp))))