From 3517ae9fea9ac3fd553921ee927334a06cfba804 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Sat, 15 Jan 2022 02:03:47 +0800 Subject: [PATCH] feat(sync): support concurrent sync remote->local --- src/main/frontend/fs/sync.cljs | 244 ++++++++++++++++++++++----------- 1 file changed, 163 insertions(+), 81 deletions(-) diff --git a/src/main/frontend/fs/sync.cljs b/src/main/frontend/fs/sync.cljs index 630dd183a..a8dad58a7 100644 --- a/src/main/frontend/fs/sync.cljs +++ b/src/main/frontend/fs/sync.cljs @@ -43,6 +43,11 @@ (def graphs-txid (persist-var/persist-var nil "graphs-txid")) +(defn- update-graphs-txid! [latest-txid graph-uuid repo] + (persist-var/-reset-value! graphs-txid [graph-uuid latest-txid] repo) + (persist-var/persist-save graphs-txid)) + + (defn- ws-stop! [*ws] (swap! *ws (fn [o] (assoc o :stop true))) (.close (:ws @*ws))) @@ -107,6 +112,22 @@ (//path -> path" + [path] + (let [parts (string/split path "/")] + (if (and (< 2 (count parts)) + (= 36 (count (parts 0))) + (= 36 (count (parts 1)))) + (string/join "/" (drop 2 parts)) + path))) + (defprotocol IRelativePath (-relative-path [this])) @@ -115,43 +136,105 @@ (defprotocol IStopped? (-stopped? [this])) ;from-path, to-path is relative path -(deftype FileTxn [from-path to-path updated deleted seq-id] +(deftype FileTxn [from-path to-path updated deleted txid] Object - (rename [_ to] - (FileTxn. from-path to updated false seq-id)) - (update [_] - (FileTxn. from-path to-path true false seq-id)) - (delete [_] - (FileTxn. from-path to-path false true seq-id)) + ;; (rename [_ to] + ;; (FileTxn. from-path to updated false seq-id)) + ;; (update [_] + ;; (FileTxn. from-path to-path true false seq-id)) + ;; (delete [_] + ;; (FileTxn. from-path to-path false true seq-id)) (renamed? [_] (not= from-path to-path)) (updated? [_] updated) (deleted? [_] deleted) IRelativePath - (-relative-path [_] to-path) + (-relative-path [_] (remove-user-graph-uuid-prefix to-path)) IEquiv (-equiv [_ ^FileTxn other] (and (= from-path (.-from-path other)) (= to-path (.-to-path other)) (= updated (.-updated other)) - (= deleted (.-deleted other)) - (= seq-id (.-seq-id other)))) + (= deleted (.-deleted other)))) + IHash + (-hash [_] (hash [from-path to-path updated deleted])) IComparable (-compare [_ ^FileTxn other] - (compare seq-id (.-seq-id other))) + (compare txid (.-txid other))) - ISeqable - (-seq [_] - `([:from-path ~from-path] [:to-path ~to-path] [:updated ~updated] [:deleted ~deleted])) + ;; ISeqable + ;; (-seq [_] + ;; `([:from-path ~from-path] [:to-path ~to-path] [:updated ~updated] [:deleted ~deleted])) IPrintWithWriter (-pr-writer [coll w _opts] (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path "\" (updated? " updated ", renamed? " (.renamed? coll) ", deleted? " (.deleted? coll) - ", seq-id " seq-id ")]"))) + ", txid " txid ")]"))) + +(defn- diff->filetxns + "convert diff(`get-diff`) to `FileTxn`" + [{:keys [TXId TXType TXContent]}] + (let [update? (= "update_files" TXType) + delete? (= "delete_files" TXType) + update-or-del-type-xf + (comp + (remove empty?) + (map #(->FileTxn % % update? delete? TXId))) + filepaths (string/split-lines TXContent)] + (case TXType + ("update_files" "delete_files") + (sequence update-or-del-type-xf filepaths) + + "rename_file" + (list (->FileTxn (first filepaths) (second filepaths) false false TXId))))) + +(defn- distinct-update-filetxns-xf + "transducer. + remove duplicate update&delete `FileTxn`s." + [rf] + (let [seen-update&delete-filetxns (volatile! #{})] + (fn + ([] (rf)) + ([result] (rf result)) + ([result ^FileTxn filetxn] + (if (and + (or (.updated? filetxn) (.deleted? filetxn)) + (contains? @seen-update&delete-filetxns filetxn)) + result + (do (vswap! seen-update&delete-filetxns conj filetxn) + (rf result filetxn))))))) + +(defn- partition-filetxns + "return transducer. + partition filetxns, at most N update-filetxns in each partition, + for delete and rename type, only one filetxn in each partition." + [n] + (comp + (partition-by #(.updated? ^FileTxn %)) + (map (fn [ts] + (if (some-> (first ts) (.updated?)) + (partition-all n ts) + (map list ts)))) + cat)) + +(defn- diffs->partitioned-filetxns + "return transducer. + 1. diff -> `FileTxn` , see also `get-diff` + 2. distinct redundant update type filetxns + 3. partition filetxns, each partition contains same type filetxns, + for update type, at most N items in each partition + for delete & rename type, only 1 item in each partition. + NOTE: this xf should apply on reversed diffs sequence (sort by txid)" + [n] + (comp + (map diff->filetxns) + cat + distinct-update-filetxns-xf + (partition-filetxns n))) (deftype FileNotFoundErr [when file]) @@ -180,12 +263,6 @@ (FileTxnSet. (assoc to-path-file-map to next-file) seq-id)) (FileTxnSet. (assoc to-path-file-map to (->FileTxn to to false true seq-id)) (inc seq-id)))) - (related-files [_] - (->> (vals to-path-file-map) - (map (fn [^FileTxn v] [(.-from-path v) (.-to-path v)])) - (flatten) - (into #{}))) - ILookup (-lookup [coll to-path] (-lookup coll to-path nil)) @@ -204,8 +281,8 @@ (-seq [_] (some-> (vals to-path-file-map) - (sort) - (seq))) + sort + seq)) IPrintWithWriter (-pr-writer [_ w opts] @@ -215,22 +292,6 @@ (set! (.-EMPTY FileTxnSet) (FileTxnSet. {} 0)) -(defn- remove-dir-prefix [dir path] - (let [r (string/replace path (js/RegExp. (str "^" dir)) "")] - (if (string/starts-with? r "/") - (string/replace-first r "/" "") - r))) - -(defn- remove-user-graph-uuid-prefix - "//path -> path" - [path] - (let [parts (string/split path "/")] - (if (and (< 2 (count parts)) - (= 36 (count (parts 0))) - (= 36 (count (parts 1)))) - (string/join "/" (drop 2 parts)) - path))) - (deftype FileMetadata [size etag path last-modified remote? ^:mutable normalized-path] Object (get-normalized-path [_] @@ -257,6 +318,17 @@ (-pr-writer [_ w _opts] (write-all w (str {:size size :etag etag :path path :remote? remote?})))) +(defn- relative-path [o] + (cond + (implements? IRelativePath o) + (-relative-path o) + + (string? o) + (remove-user-graph-uuid-prefix o) + + :else + (throw (js/Error. (str "unsupport type " (type o)))))) + (defprotocol IRSAPI (get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata") @@ -473,35 +545,43 @@ (defn update-txns [filetxnset txns] (reduce update-txn filetxnset txns)) -(defn- apply-filetxn [graph-uuid base-path ^FileTxn filetxn] +(defn- apply-filetxns + [graph-uuid base-path filetxns] (cond - (.renamed? filetxn) - (rename-local-file rsapi graph-uuid base-path (.-from-path filetxn) (.-to-path filetxn)) + (.renamed? (first filetxns)) + (let [filetxn (first filetxns)] + (assert (= 1 (count filetxns))) + (rename-local-file rsapi graph-uuid base-path + (relative-path (.-from-path filetxn)) + (relative-path (.-to-path filetxn)))) - (.updated? filetxn) - (update-local-files rsapi graph-uuid base-path [(.-to-path filetxn)]) + (.updated? (first filetxns)) + (update-local-files rsapi graph-uuid base-path (map relative-path filetxns)) - (.deleted? filetxn) - (go - (let [r (local-files! [path])) - r (local-files! [path]))] +(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid] + (go-loop [filetxns-partitions* filetxns-partitions] + (when (seq filetxns-partitions*) + (let [filetxns (first filetxns-partitions*) + paths (map relative-path filetxns) + _ (. sync-state (add-current-remote->local-files! paths)) + r (local-files! paths))] (if (instance? ExceptionInfo r) r - (recur (next filetxns*))))))) + (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))] + (reset! *txid latest-txid) + (update-graphs-txid! latest-txid graph-uuid repo) + (recur (next filetxns-partitions*)))))))) (defmulti need-sync-remote? (fn [v] (cond (= :max v) @@ -588,15 +668,18 @@ r (let [[diff-txns latest-txid] r] (when (number? latest-txid) - (let [filetxnset (update-txns (.-EMPTY FileTxnSet) diff-txns)] - (prn "filetxnset" filetxnset) - ;; TODO: precheck etag - (let [apply-result (partitioned-filetxns 5) + (completing (fn [r i] (conj r (reverse i)))) ;reverse + '() + (reverse diff-txns))] + (prn "partition-filetxns" partitioned-filetxns) + ;; TODO: precheck etag + (let [apply-result + (remote! [this ^FileChangeEvent e] (let [type (.-type e)] - (if (contains-path? (get-ignore-files this) (-relative-path e)) + (if (contains-path? (get-ignore-files this) (relative-path e)) (go {:succ true}) ; ignore (do (prn "sync-local->remote!" e) - (let [path* (-relative-path e) + (let [path* (relative-path e) r (cond (or (= "add" type) (= "change" type)) @@ -703,8 +786,7 @@ (do (println "sync-local->remote! update txid" r*) ;; persist txid - (persist-var/-reset-value! graphs-txid [graph-uuid r*] repo) - (persist-var/persist-save graphs-txid) + (update-graphs-txid! r* graph-uuid repo) (reset! *txid r*) {:succ true}) @@ -725,9 +807,9 @@ change-events (->> diff-local-files (mapv #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil)) - (filterv #(let [relative-path (-relative-path %)] - (and (not (contains-path? ignore-files relative-path)) - (contains-path? monitored-dirs relative-path)))))] + (filterv #(let [path (relative-path %)] + (and (not (contains-path? ignore-files path)) + (contains-path? monitored-dirs path)))))] (println "[full-sync]" (count change-events) "files need to sync to remote") (loop [es change-events] (if (empty? es) @@ -808,7 +890,7 @@ ::full-sync (local=>local->remote - (local this ::local-remote args)) + (local this ::local->remote args)) ::remote->local=>full-sync (local this ::full-sync args)) ::stop