diff --git a/src/main/frontend/fs/sync.cljs b/src/main/frontend/fs/sync.cljs index 4559fea4f..9eba427bc 100644 --- a/src/main/frontend/fs/sync.cljs +++ b/src/main/frontend/fs/sync.cljs @@ -77,7 +77,28 @@ (s/def ::TXContent string?) (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent])) +(s/def ::succ-map #(= {:succ true} %)) +(s/def ::unknown-map (comp some? :unknown)) +(s/def ::stop-map #(= {:stop true} %)) +(s/def ::need-sync-remote #(= {:need-sync-remote true} %)) +(s/def ::sync-local->remote!-result + (s/or :succ ::succ-map + :need-sync-remote ::need-sync-remote + :unknown ::unknown-map)) + +(s/def ::sync-remote->local!-result + (s/or :succ ::succ-map + :need-remote->local-full-sync + #(= {:need-remote->local-full-sync true} %) + :stop ::stop-map + :unknown ::unknown-map)) + +(s/def ::sync-local->remote-all-files!-result + (s/or :succ ::succ-map + :stop ::stop-map + :need-sync-remote ::need-sync-remote + :unknown ::unknown-map)) (def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s") @@ -161,8 +182,8 @@ [path] (let [parts (string/split path "/")] (if (and (< 2 (count parts)) - (= 36 (count (parts 0))) - (= 36 (count (parts 1)))) + (= 36 (count (parts 0))) + (= 36 (count (parts 1)))) (string/join "/" (drop 2 parts)) path))) @@ -283,11 +304,11 @@ NOTE: this xf should apply on reversed diffs sequence (sort by txid)" [n] (comp - (map diff->filetxns) - cat - distinct-update-filetxns-xf - remove-deleted-filetxns-xf - (partition-filetxns n))) + (map diff->filetxns) + cat + distinct-update-filetxns-xf + remove-deleted-filetxns-xf + (partition-filetxns n))) (defn- filepath->diff [index {:keys [relative-path user-uuid graph-uuid]}] @@ -307,7 +328,6 @@ (map-indexed filepath->diff) (diffs->partitioned-filetxns n))) - (deftype FileMetadata [size etag path last-modified remote? ^:mutable normalized-path] Object (get-normalized-path [_] @@ -460,10 +480,8 @@ (retry-rsapi #(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path filepaths local-txid token)))))))) - (def rsapi (->RSAPI)) - (deftype RemoteAPI [] Object @@ -555,9 +573,9 @@ :Transactions (as-> txns (sort-by :TXId txns) - [txns - (:TXId (last txns)) - (:TXId (first txns))])))))) + [txns + (:TXId (last txns)) + (:TXId (first txns))])))))) (create-graph [this graph-name] (.request this "create_graph" {:GraphName graph-name})) @@ -652,6 +670,8 @@ ;; type = "change" | "add" | "unlink" + + (deftype FileChangeEvent [type dir path stat] IRelativePath (-relative-path [_] (remove-dir-prefix dir path)) @@ -706,8 +726,6 @@ (sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal. if local-txid != remote-txid, return {:need-sync-remote true}")) - - (deftype Remote->LocalSyncer [graph-uuid base-path repo *txid *sync-state ^:mutable local->remote-syncer *stopped] Object @@ -789,12 +807,11 @@ latest-txid (:TXId (local)]" - (count diff-remote-files) "files need to sync") + (count diff-remote-files) "files need to sync") (local! this (map -relative-path diff-remote-files) latest-txid)))))) - (defn- file-changed? "return true when file changed compared with remote" [graph-uuid file-path-without-base-path base-path] @@ -866,12 +883,13 @@ (async/close! c))))) c)) - (sync-local->remote! [this es] - (if (empty? es) - {:succ true} - (let [type (.-type ^FileChangeEvent (first es)) - ignore-files (get-ignore-files this) - es->paths-xf (comp + + (sync-local->remote! [this es] + (if (empty? es) + (go {:succ true}) + (let [type (.-type ^FileChangeEvent (first es)) + ignore-files (get-ignore-files this) + es->paths-xf (comp (map #(relative-path %)) (filter #(not (contains-path? ignore-files %)))) paths (sequence es->paths-xf es)] @@ -932,18 +950,19 @@ (if stopped {:stop true} (if (empty? es-partitions) - {:succ true} - (let [{:keys [succ need-sync-remote unknown] :as r} - (remote! this (first es-partitions)))] - (cond - succ - (recur (next es-partitions)) - + {:succ true} + (let [{:keys [succ need-sync-remote unknown] :as r} + (remote! this (first es-partitions)))] + (s/assert ::sync-local->remote!-result r) + (cond + succ + (recur (next es-partitions)) (or need-sync-remote unknown) r))))))))) ;;; sync state + (defn sync-state "create a new sync-state" [] @@ -1003,6 +1022,7 @@ ;;; put all stuff together + (deftype ^:large-vars/cleanup-todo SyncManager [graph-uuid base-path *sync-state ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer @@ -1067,8 +1087,9 @@ (full-sync [this] (go - (let [{:keys [succ need-sync-remote unknown stop]} + (let [{:keys [succ need-sync-remote unknown stop] :as r} (remote-all-files! local->remote-syncer))] + (s/assert ::sync-local->remote-all-files!-result r) (cond succ (.schedule this ::idle nil) @@ -1099,8 +1120,9 @@ (go (if (some-> remote-val :txid (<= @*txid)) (.schedule this ::idle nil) - (let [{:keys [succ unknown stop need-remote->local-full-sync]} + (let [{:keys [succ unknown stop need-remote->local-full-sync] :as r} (local! remote->local-syncer))] + (s/assert ::sync-remote->local!-result r) (cond need-remote->local-full-sync (.schedule this ::remote->local-full-sync=>local->remote-full-sync nil) @@ -1115,8 +1137,9 @@ (local->remote [this {^FileChangeEvents local-change :local}] (assert (some? local-change) local-change) (go - (let [{:keys [succ need-sync-remote unknown]} + (let [{:keys [succ need-sync-remote unknown] :as r} (remote! local->remote-syncer [local-change]))] + (s/assert ::sync-local->remote!-result r) (cond succ (.schedule this ::idle nil) @@ -1139,7 +1162,6 @@ (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]) (swap! *sync-state sync-state--update-state ::stop)))) - (defn sync-manager [graph-uuid base-path repo txid *sync-state full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan local-changes-chan] (let [*txid (atom txid) @@ -1157,13 +1179,11 @@ full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false))) - (def full-sync-chan (chan 1)) (def stop-sync-chan (chan 1)) (def remote->local-sync-chan (chan)) (def local->remote-sync-chan (chan)) - (defn sync-stop [] (when-let [sm (state/get-file-sync-manager)] (println "stopping sync-manager")