feat(sync): support concurrent sync remote->local

pull/3839/head
rcmerci 2022-01-15 02:03:47 +08:00
parent b4f0405bbc
commit 3517ae9fea
1 changed files with 163 additions and 81 deletions

View File

@ -43,6 +43,11 @@
(def graphs-txid (persist-var/persist-var nil "graphs-txid"))
(defn- update-graphs-txid! [latest-txid graph-uuid repo]
(persist-var/-reset-value! graphs-txid [graph-uuid latest-txid] repo)
(persist-var/persist-save graphs-txid))
(defn- ws-stop! [*ws]
(swap! *ws (fn [o] (assoc o :stop true)))
(.close (:ws @*ws)))
@ -107,6 +112,22 @@
(<! (request api-name body token refresh-token-fn (inc retry-count)))))
(:resp resp))))))
(defn- remove-dir-prefix [dir path]
(let [r (string/replace path (js/RegExp. (str "^" dir)) "")]
(if (string/starts-with? r "/")
(string/replace-first r "/" "")
r)))
(defn- remove-user-graph-uuid-prefix
"<user-uuid>/<graph-uuid>/path -> path"
[path]
(let [parts (string/split path "/")]
(if (and (< 2 (count parts))
(= 36 (count (parts 0)))
(= 36 (count (parts 1))))
(string/join "/" (drop 2 parts))
path)))
(defprotocol IRelativePath
(-relative-path [this]))
@ -115,43 +136,105 @@
(defprotocol IStopped?
(-stopped? [this]))
;from-path, to-path is relative path
(deftype FileTxn [from-path to-path updated deleted seq-id]
(deftype FileTxn [from-path to-path updated deleted txid]
Object
(rename [_ to]
(FileTxn. from-path to updated false seq-id))
(update [_]
(FileTxn. from-path to-path true false seq-id))
(delete [_]
(FileTxn. from-path to-path false true seq-id))
;; (rename [_ to]
;; (FileTxn. from-path to updated false seq-id))
;; (update [_]
;; (FileTxn. from-path to-path true false seq-id))
;; (delete [_]
;; (FileTxn. from-path to-path false true seq-id))
(renamed? [_]
(not= from-path to-path))
(updated? [_] updated)
(deleted? [_] deleted)
IRelativePath
(-relative-path [_] to-path)
(-relative-path [_] (remove-user-graph-uuid-prefix to-path))
IEquiv
(-equiv [_ ^FileTxn other]
(and (= from-path (.-from-path other))
(= to-path (.-to-path other))
(= updated (.-updated other))
(= deleted (.-deleted other))
(= seq-id (.-seq-id other))))
(= deleted (.-deleted other))))
IHash
(-hash [_] (hash [from-path to-path updated deleted]))
IComparable
(-compare [_ ^FileTxn other]
(compare seq-id (.-seq-id other)))
(compare txid (.-txid other)))
ISeqable
(-seq [_]
`([:from-path ~from-path] [:to-path ~to-path] [:updated ~updated] [:deleted ~deleted]))
;; ISeqable
;; (-seq [_]
;; `([:from-path ~from-path] [:to-path ~to-path] [:updated ~updated] [:deleted ~deleted]))
IPrintWithWriter
(-pr-writer [coll w _opts]
(write-all w "#FileTxn[\"" from-path "\" -> \"" to-path
"\" (updated? " updated ", renamed? " (.renamed? coll) ", deleted? " (.deleted? coll)
", seq-id " seq-id ")]")))
", txid " txid ")]")))
(defn- diff->filetxns
"convert diff(`get-diff`) to `FileTxn`"
[{:keys [TXId TXType TXContent]}]
(let [update? (= "update_files" TXType)
delete? (= "delete_files" TXType)
update-or-del-type-xf
(comp
(remove empty?)
(map #(->FileTxn % % update? delete? TXId)))
filepaths (string/split-lines TXContent)]
(case TXType
("update_files" "delete_files")
(sequence update-or-del-type-xf filepaths)
"rename_file"
(list (->FileTxn (first filepaths) (second filepaths) false false TXId)))))
(defn- distinct-update-filetxns-xf
"transducer.
remove duplicate update&delete `FileTxn`s."
[rf]
(let [seen-update&delete-filetxns (volatile! #{})]
(fn
([] (rf))
([result] (rf result))
([result ^FileTxn filetxn]
(if (and
(or (.updated? filetxn) (.deleted? filetxn))
(contains? @seen-update&delete-filetxns filetxn))
result
(do (vswap! seen-update&delete-filetxns conj filetxn)
(rf result filetxn)))))))
(defn- partition-filetxns
"return transducer.
partition filetxns, at most N update-filetxns in each partition,
for delete and rename type, only one filetxn in each partition."
[n]
(comp
(partition-by #(.updated? ^FileTxn %))
(map (fn [ts]
(if (some-> (first ts) (.updated?))
(partition-all n ts)
(map list ts))))
cat))
(defn- diffs->partitioned-filetxns
"return transducer.
1. diff -> `FileTxn` , see also `get-diff`
2. distinct redundant update type filetxns
3. partition filetxns, each partition contains same type filetxns,
for update type, at most N items in each partition
for delete & rename type, only 1 item in each partition.
NOTE: this xf should apply on reversed diffs sequence (sort by txid)"
[n]
(comp
(map diff->filetxns)
cat
distinct-update-filetxns-xf
(partition-filetxns n)))
(deftype FileNotFoundErr [when file])
@ -180,12 +263,6 @@
(FileTxnSet. (assoc to-path-file-map to next-file) seq-id))
(FileTxnSet. (assoc to-path-file-map to (->FileTxn to to false true seq-id)) (inc seq-id))))
(related-files [_]
(->> (vals to-path-file-map)
(map (fn [^FileTxn v] [(.-from-path v) (.-to-path v)]))
(flatten)
(into #{})))
ILookup
(-lookup [coll to-path]
(-lookup coll to-path nil))
@ -204,8 +281,8 @@
(-seq [_]
(some->
(vals to-path-file-map)
(sort)
(seq)))
sort
seq))
IPrintWithWriter
(-pr-writer [_ w opts]
@ -215,22 +292,6 @@
(set! (.-EMPTY FileTxnSet) (FileTxnSet. {} 0))
(defn- remove-dir-prefix [dir path]
(let [r (string/replace path (js/RegExp. (str "^" dir)) "")]
(if (string/starts-with? r "/")
(string/replace-first r "/" "")
r)))
(defn- remove-user-graph-uuid-prefix
"<user-uuid>/<graph-uuid>/path -> path"
[path]
(let [parts (string/split path "/")]
(if (and (< 2 (count parts))
(= 36 (count (parts 0)))
(= 36 (count (parts 1))))
(string/join "/" (drop 2 parts))
path)))
(deftype FileMetadata [size etag path last-modified remote? ^:mutable normalized-path]
Object
(get-normalized-path [_]
@ -257,6 +318,17 @@
(-pr-writer [_ w _opts]
(write-all w (str {:size size :etag etag :path path :remote? remote?}))))
(defn- relative-path [o]
(cond
(implements? IRelativePath o)
(-relative-path o)
(string? o)
(remove-user-graph-uuid-prefix o)
:else
(throw (js/Error. (str "unsupport type " (type o))))))
(defprotocol IRSAPI
(get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata")
@ -473,35 +545,43 @@
(defn update-txns [filetxnset txns]
(reduce update-txn filetxnset txns))
(defn- apply-filetxn [graph-uuid base-path ^FileTxn filetxn]
(defn- apply-filetxns
[graph-uuid base-path filetxns]
(cond
(.renamed? filetxn)
(rename-local-file rsapi graph-uuid base-path (.-from-path filetxn) (.-to-path filetxn))
(.renamed? (first filetxns))
(let [filetxn (first filetxns)]
(assert (= 1 (count filetxns)))
(rename-local-file rsapi graph-uuid base-path
(relative-path (.-from-path filetxn))
(relative-path (.-to-path filetxn))))
(.updated? filetxn)
(update-local-files rsapi graph-uuid base-path [(.-to-path filetxn)])
(.updated? (first filetxns))
(update-local-files rsapi graph-uuid base-path (map relative-path filetxns))
(.deleted? filetxn)
(go
(let [r (<! (delete-local-files rsapi graph-uuid base-path [(.-to-path filetxn)]))]
(if (and (instance? ExceptionInfo r)
(string/index-of (str (ex-cause r)) "No such file or directory"))
true
r)))))
(.deleted? (first filetxns))
(let [filetxn (first filetxns)]
(assert (= 1 (count filetxns)))
(go
(let [r (<! (delete-local-files rsapi graph-uuid base-path [(relative-path filetxn)]))]
(if (and (instance? ExceptionInfo r)
(string/index-of (str (ex-cause r)) "No such file or directory"))
true
r))))))
(defn- apply-filetxns [^SyncState sync-state graph-uuid base-path filetxns]
(go-loop [filetxns* filetxns]
(when (seq filetxns*)
(let [filetxn (first filetxns*)
path (.-to-path filetxn)
_ (. sync-state (add-current-remote->local-files! [path]))
r (<! (apply-filetxn graph-uuid base-path filetxn))
_ (. sync-state (remove-current-remote->local-files! [path]))]
(defn- apply-filetxns-partitions [^SyncState sync-state graph-uuid base-path filetxns-partitions repo *txid]
(go-loop [filetxns-partitions* filetxns-partitions]
(when (seq filetxns-partitions*)
(let [filetxns (first filetxns-partitions*)
paths (map relative-path filetxns)
_ (. sync-state (add-current-remote->local-files! paths))
r (<! (apply-filetxns graph-uuid base-path filetxns))
_ (. sync-state (remove-current-remote->local-files! paths))]
(if (instance? ExceptionInfo r)
r
(recur (next filetxns*)))))))
(let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))]
(reset! *txid latest-txid)
(update-graphs-txid! latest-txid graph-uuid repo)
(recur (next filetxns-partitions*))))))))
(defmulti need-sync-remote? (fn [v] (cond
(= :max v)
@ -588,15 +668,18 @@
r
(let [[diff-txns latest-txid] r]
(when (number? latest-txid)
(let [filetxnset (update-txns (.-EMPTY FileTxnSet) diff-txns)]
(prn "filetxnset" filetxnset)
;; TODO: precheck etag
(let [apply-result (<! (apply-filetxns sync-state graph-uuid base-path filetxnset))]
(let [partitioned-filetxns (transduce (diffs->partitioned-filetxns 5)
(completing (fn [r i] (conj r (reverse i)))) ;reverse
'()
(reverse diff-txns))]
(prn "partition-filetxns" partitioned-filetxns)
;; TODO: precheck etag
(let [apply-result
(<! (apply-filetxns-partitions
sync-state graph-uuid base-path partitioned-filetxns repo *txid))]
(when-not (instance? ExceptionInfo apply-result)
(reset! *txid latest-txid)
;; persist txid
(persist-var/-reset-value! graphs-txid [graph-uuid latest-txid] repo)
(persist-var/persist-save graphs-txid))
(update-graphs-txid! latest-txid graph-uuid repo))
apply-result))))))]
(if (instance? ExceptionInfo r)
@ -623,8 +706,8 @@
(filter-file-change-events-fn [this]
(fn [^FileChangeEvent e] (and (instance? FileChangeEvent e)
(string/starts-with? (.-dir e) base-path)
(not (contains-path? (get-ignore-files this) (-relative-path e)))
(contains-path? (get-monitored-dirs this) (-relative-path e)))))
(not (contains-path? (get-ignore-files this) (relative-path e)))
(contains-path? (get-monitored-dirs this) (relative-path e)))))
(filtered-chan
;; "check base-path"
@ -662,9 +745,9 @@
(when (filter-e-fn e)
(if (= "unlink" (.-type e))
(conj! tcoll e)
(if (<! (file-changed? graph-uuid (-relative-path e) base-path))
(if (<! (file-changed? graph-uuid (relative-path e) base-path))
(conj! tcoll e)
(prn "file unchanged" (-relative-path e)))))
(prn "file unchanged" (relative-path e)))))
(recur timeout-c tcoll))
(nil? e)
@ -675,11 +758,11 @@
(sync-local->remote! [this ^FileChangeEvent e]
(let [type (.-type e)]
(if (contains-path? (get-ignore-files this) (-relative-path e))
(if (contains-path? (get-ignore-files this) (relative-path e))
(go {:succ true}) ; ignore
(do
(prn "sync-local->remote!" e)
(let [path* (-relative-path e)
(let [path* (relative-path e)
r
(cond
(or (= "add" type) (= "change" type))
@ -703,8 +786,7 @@
(do
(println "sync-local->remote! update txid" r*)
;; persist txid
(persist-var/-reset-value! graphs-txid [graph-uuid r*] repo)
(persist-var/persist-save graphs-txid)
(update-graphs-txid! r* graph-uuid repo)
(reset! *txid r*)
{:succ true})
@ -725,9 +807,9 @@
change-events (->> diff-local-files
(mapv
#(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil))
(filterv #(let [relative-path (-relative-path %)]
(and (not (contains-path? ignore-files relative-path))
(contains-path? monitored-dirs relative-path)))))]
(filterv #(let [path (relative-path %)]
(and (not (contains-path? ignore-files path))
(contains-path? monitored-dirs path)))))]
(println "[full-sync]" (count change-events) "files need to sync to remote")
(loop [es change-events]
(if (empty? es)
@ -808,7 +890,7 @@
::full-sync
(<! (.full-sync this))
::remote->local=>local->remote
(<! (.remote->local this ::local-remote args))
(<! (.remote->local this ::local->remote args))
::remote->local=>full-sync
(<! (.remote->local this ::full-sync args))
::stop