From 85bb55a7843ab7216d4da6394d8ac8f0723edf41 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Sat, 15 Jan 2022 23:34:33 +0800 Subject: [PATCH] feat(sync): support concurrent full-sync local->remote --- resources/package.json | 3 +- src/electron/electron/file_sync_rsapi.cljs | 2 +- src/main/frontend/fs/sync.cljs | 117 ++++++++++++--------- static/yarn.lock | 30 ++++++ 4 files changed, 101 insertions(+), 51 deletions(-) diff --git a/resources/package.json b/resources/package.json index 266127a73..27fcab298 100644 --- a/resources/package.json +++ b/resources/package.json @@ -35,7 +35,8 @@ "diff-match-patch": "1.0.5", "https-proxy-agent": "5.0.0", "@sentry/electron": "2.5.1", - "posthog-js": "1.10.2" + "posthog-js": "1.10.2", + "@andelf/rsapi": "0.0.2" }, "devDependencies": { "@electron-forge/cli": "^6.0.0-beta.57", diff --git a/src/electron/electron/file_sync_rsapi.cljs b/src/electron/electron/file_sync_rsapi.cljs index 8f5d50055..22de69002 100644 --- a/src/electron/electron/file_sync_rsapi.cljs +++ b/src/electron/electron/file_sync_rsapi.cljs @@ -1,5 +1,5 @@ (ns electron.file-sync-rsapi - (:require ["rsapi" :as rsapi])) + (:require ["@andelf/rsapi" :as rsapi])) (defn get-local-files-meta [graph-uuid base-path file-paths] (rsapi/getLocalFilesMeta graph-uuid base-path (clj->js file-paths))) diff --git a/src/main/frontend/fs/sync.cljs b/src/main/frontend/fs/sync.cljs index a8dad58a7..77db13b30 100644 --- a/src/main/frontend/fs/sync.cljs +++ b/src/main/frontend/fs/sync.cljs @@ -619,6 +619,7 @@ (def local-changes-chan (chan 100)) +;;; type = "change" | "add" | "unlink" (deftype FileChangeEvent [type dir path stat] IRelativePath (-relative-path [_] (remove-dir-prefix dir path)) @@ -633,6 +634,20 @@ (-pr-writer [_ w _opts] (write-all w (str {:type type :base-path dir :path path})))) +(defn- partition-file-change-events + "return transducer. + partition `FileChangeEvent`s, at most N file-change-events in each partition. + only one type in a partition." + [n] + (comp + (partition-by (fn [^FileChangeEvent e] + (case (.-type e) + ("add" "change") :add-or-change + "unlink" :unlink))) + (map #(partition-all n %)) + cat)) + + (defn file-watch-handler [type {:keys [dir path _content stat] :as _payload}] (go @@ -652,7 +667,7 @@ (stop-local->remote! [this]) (ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN, return chan returning events with rate limited") - (sync-local->remote! [this ^FileChangeEvent e]) + (sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.") (sync-local->remote-all-files! [this stop-chan] "compare all local files to remote ones, sync if not equal. ensure local-txid = remote-txid before calling this func")) @@ -756,44 +771,42 @@ (async/close! c))))) c)) - (sync-local->remote! [this ^FileChangeEvent e] - (let [type (.-type e)] - (if (contains-path? (get-ignore-files this) (relative-path e)) - (go {:succ true}) ; ignore - (do - (prn "sync-local->remote!" e) - (let [path* (relative-path e) - r - (cond - (or (= "add" type) (= "change" type)) - (update-remote-file rsapi graph-uuid base-path path* @*txid) + (sync-local->remote! [this es] + (if (empty? es) + {:succ true} + (let [type (.-type ^FileChangeEvent (first es)) + ignore-files (get-ignore-files this) + es->paths-xf (comp + (map #(relative-path %)) + (filter #(not (contains-path? ignore-files %)))) + paths (sequence es->paths-xf es)] + (println "sync-local->remote" paths) + (let [r (case type + ("add" "change") + (update-remote-files rsapi graph-uuid base-path paths @*txid) - (= "unlink" type) - (delete-remote-files rsapi graph-uuid base-path [path*] @*txid) + "unlink" + (delete-remote-files rsapi graph-uuid base-path paths @*txid))] + (go + (let [_ (.add-current-local->remote-files! sync-state paths) + r* (remote-files! sync-state paths)] + (cond + (need-sync-remote? r*) + {:need-sync-remote true} - ;; (= "rename" type) - ;; (rename-local-file) - )] - (go - (let [_ (.add-current-local->remote-files! sync-state [path*]) - r* (remote-files! sync-state [path*])] - (cond - (need-sync-remote? r*) - {:need-sync-remote true} + (number? r*) ; succ + (do + (println "sync-local->remote! update txid" r*) + ;; persist txid + (update-graphs-txid! r* graph-uuid repo) + (reset! *txid r*) + {:succ true}) - (number? r*) ; succ - (do - (println "sync-local->remote! update txid" r*) - ;; persist txid - (update-graphs-txid! r* graph-uuid repo) - (reset! *txid r*) - {:succ true}) - - :else - (do - (println "sync-local->remote unknown:" r*) - {:unknown r*}))))))))) + :else + (do + (println "sync-local->remote unknown:" r*) + {:unknown r*})))))))) (sync-local->remote-all-files! [this stop-chan] (go @@ -804,23 +817,29 @@ diff-local-files (set/difference local-all-files-meta remote-all-files-meta) ignore-files (get-ignore-files this) monitored-dirs (get-monitored-dirs this) - change-events (->> diff-local-files - (mapv - #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil)) - (filterv #(let [path (relative-path %)] - (and (not (contains-path? ignore-files path)) - (contains-path? monitored-dirs path)))))] - (println "[full-sync]" (count change-events) "files need to sync to remote") - (loop [es change-events] - (if (empty? es) + change-events-partitions + (sequence + (comp + ;; convert to FileChangeEvent + (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil)) + ;; filter ignore-files & monitored-dirs + (filter #(let [path (relative-path %)] + (and (not (contains-path? ignore-files path)) + (contains-path? monitored-dirs path)))) + ;; partition FileChangeEvents + (partition-file-change-events 5)) + diff-local-files)] + (println "[full-sync]" (count (flatten change-events-partitions)) "files need to sync to remote") + (loop [es-partitions change-events-partitions] + (if (empty? es-partitions) {:succ true} (if (async/poll! stop-chan) {:stop true} - (let [e (first es) - {:keys [succ need-sync-remote unknown] :as r} (remote! this e))] + (let [{:keys [succ need-sync-remote unknown] :as r} + (remote! this (first es-partitions)))] (cond succ - (recur (next es)) + (recur (next es-partitions)) (or need-sync-remote unknown) r))))))))) @@ -960,7 +979,7 @@ (assert (some? local-change)) (go (let [{:keys [succ need-sync-remote unknown]} - (remote! local->remote-syncer local-change))] + (remote! local->remote-syncer [local-change]))] (cond succ (.schedule this ::idle) diff --git a/static/yarn.lock b/static/yarn.lock index 4111ad656..623ea8beb 100644 --- a/static/yarn.lock +++ b/static/yarn.lock @@ -12,6 +12,36 @@ resolved "https://registry.yarnpkg.com/7zip-bin/-/7zip-bin-5.1.1.tgz#9274ec7460652f9c632c59addf24efb1684ef876" integrity sha512-sAP4LldeWNz0lNzmTird3uWfFDWWTeg6V/MsmyyLR9X1idwKBWIgt/ZvinqQldJm3LecKEs1emkbquO6PCiLVQ== +"@andelf/rsapi-darwin-arm64@0.0.2": + version "0.0.2" + resolved "https://registry.npmmirror.com/@andelf/rsapi-darwin-arm64/download/@andelf/rsapi-darwin-arm64-0.0.2.tgz#c89a025fbc2e632ef1ed2a96fa64787e29e4bab8" + integrity sha512-WfsVD+5EZkGD7NMyJ0Akwrowgf3IF2BcckERwx0PHQzHi09NZLxZtMr2NN2s6q8QOvGUfbqHYEATZ8zaZMU7XQ== + +"@andelf/rsapi-darwin-x64@0.0.2": + version "0.0.2" + resolved "https://registry.npmmirror.com/@andelf/rsapi-darwin-x64/download/@andelf/rsapi-darwin-x64-0.0.2.tgz#d9f71946122dc4bb056e512fa9fc820d30c7159b" + integrity sha512-r5F9odlNt0dD2W4eKYlJnqqoim/5kn/lzch7kHmvR9SKbxIe8jn/XwGqk5Kj5zIlhwpksT24dF47ppnZ8gJZRw== + +"@andelf/rsapi-linux-x64-gnu@0.0.2": + version "0.0.2" + resolved "https://registry.npmmirror.com/@andelf/rsapi-linux-x64-gnu/download/@andelf/rsapi-linux-x64-gnu-0.0.2.tgz#f90d8402cbb7e07fcc5defc5c3743d2a591cc22b" + integrity sha512-olB2TYRzqB5e8brtyvj0+YnbdLyWCvsQjbDqV8nKyvUThSEvBPUgREBigdpb26wSETPCX7q/IEQNDDOLdXjzmQ== + +"@andelf/rsapi-win32-x64-msvc@0.0.2": + version "0.0.2" + resolved "https://registry.npmmirror.com/@andelf/rsapi-win32-x64-msvc/download/@andelf/rsapi-win32-x64-msvc-0.0.2.tgz#cee77358a4a3601d8e5a6aa5199ef537eadbd6f3" + integrity sha512-qgVZY4lnnlVFu5cNV/pEkvlI/hGokpfpIAGAipYLKVcAmuZ/H9+3aD8PcSi4uRIuexFlSBIcNBi7FtnNahMUcA== + +"@andelf/rsapi@0.0.2": + version "0.0.2" + resolved "https://registry.npmmirror.com/@andelf/rsapi/download/@andelf/rsapi-0.0.2.tgz#8b24e9792123131f5f4529f00fb8f380333acc26" + integrity sha512-/CVyYM3+fRFvO6Dqyd8TJv6IOZsaFpKAkX07Vs2FbnSzR50XtGujYpcjyKjmq3zLPmtJYuvD80qwKqixvzCPPw== + optionalDependencies: + "@andelf/rsapi-darwin-arm64" "0.0.2" + "@andelf/rsapi-darwin-x64" "0.0.2" + "@andelf/rsapi-linux-x64-gnu" "0.0.2" + "@andelf/rsapi-win32-x64-msvc" "0.0.2" + "@develar/schema-utils@~2.1.0": version "2.1.0" resolved "https://registry.yarnpkg.com/@develar/schema-utils/-/schema-utils-2.1.0.tgz#eceb1695bfbed6f6bb84666d5d3abe5e1fd54e17"