refactor(rtc): ws2 support s3-presign-url resp

pull/11293/head
rcmerci 2024-05-05 14:30:15 +08:00
parent cad9602d48
commit 9bd1b41441
3 changed files with 58 additions and 29 deletions

View File

@ -1,7 +1,8 @@
(ns logseq.common.missionary-util
"Utils based on missionary."
(:import [missionary Cancelled])
(:require [missionary.core :as m]))
(:require [clojure.core.async :as a]
[missionary.core :as m])
(:import [missionary Cancelled]))
(def delays (reductions * 1000 (repeat 2)))
@ -55,3 +56,14 @@
"Return the canceler"
[task key & {:keys [succ fail]}]
(task (or succ #(prn key :succ %)) (or fail #(js/console.log key (or (some-> % .-stack) %)))))
(defn >!
"Return a task that
puts given value on given channel,
completing with true when put is accepted, or false if port was closed."
[c x] (doto (m/dfv) (->> (a/put! c x))))
(defn <!
"Return a task that takes from given channel,
completing with value when take is accepted, or nil if port was closed."
[c] (doto (m/dfv) (->> (a/take! c))))

View File

@ -337,7 +337,4 @@
(def *rtc-auto-push? *rtc-auto-push?)))
(cancel)
)

View File

@ -2,7 +2,8 @@
"Websocket wrapped by missionary.
based on
https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs"
(:require [frontend.worker.rtc.const :as rtc-const]
(:require [cljs-http.client :as http]
[frontend.worker.rtc.const :as rtc-const]
[logseq.common.missionary-util :as c.m]
[missionary.core :as m]))
@ -89,12 +90,12 @@
(c.m/backoff
(take retry-count c.m/delays)
(m/sp
(try
(m/? (m/timeout (create-mws* url) open-ws-timeout))
(catch js/CloseEvent e
(throw (ex-info "failed to open websocket conn"
{:missionary/retry true}
e)))))))
(try
(m/? (m/timeout (create-mws* url) open-ws-timeout))
(catch js/CloseEvent e
(throw (ex-info "failed to open websocket conn"
{:missionary/retry true}
e)))))))
(comment
(defn get-mws-create
"Returns a task to get a mws(missionary-websocket), creating one if needed.
@ -171,23 +172,42 @@
(map rtc-const/data-from-ws-coercer)
(:recv-flow m-ws)))
(defn send&recv
(defn- send&recv*
"Return a task: send message wait to recv its response and return it.
Throw if timeout"
[mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
(assert (pos-int? timeout-ms))
(let [req-id (str (random-uuid))
message (assoc message :req-id req-id)]
(m/sp
(let [mws (m/? (send mws message))
result (m/?
(m/timeout
(m/reduce
(fn [_ v]
(when (= req-id (:req-id v))
(reduced v)))
(recv-flow mws))
timeout-ms))]
(when-not result
(throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true})))
result))))
{:pre [(pos-int? timeout-ms)
(some? (:req-id message))]}
(m/sp
(let [mws (m/? (send mws message))
req-id (:req-id message)
result (m/?
(m/timeout
(m/reduce
(fn [_ v]
(when (= req-id (:req-id v))
(reduced v)))
(recv-flow mws))
timeout-ms))]
(when-not result
(throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true})))
result)))
(defn send&recv
"Return a task that send the message then wait to recv its response,
if the response has kv `:s3-presign-url`, fetch the response from s3,
else just return the response.
Throw if timeout"
[mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
(m/sp
(let [req-id (str (random-uuid))
message (assoc message :req-id req-id)
resp (m/? (send&recv* mws message :timeout-ms timeout-ms))]
(if-let [s3-presign-url (:s3-presign-url resp)]
(let [{:keys [status body]} (m/? (c.m/<! (http/get s3-presign-url {:with-credentials? false})))]
(if (http/unexceptional-status? status)
(js->clj (js/JSON.parse body) :keywordize-keys true)
{:req-id req-id
:ex-message "get s3 object failed"
:ex-data {:type :get-s3-object-failed :status status :body body}}))
resp))))