59 lines
1.8 KiB
JavaScript
59 lines
1.8 KiB
JavaScript
import Rx, { AsyncSubject, Observable } from 'rx';
|
|
import moment from 'moment';
|
|
import debugFactory from 'debug';
|
|
|
|
const debug = debugFactory('fcc:rxUtils');
|
|
|
|
export function saveInstance(instance) {
|
|
return new Rx.Observable.create(function(observer) {
|
|
if (!instance || typeof instance.save !== 'function') {
|
|
debug('no instance or save method');
|
|
observer.onNext();
|
|
return observer.onCompleted();
|
|
}
|
|
return instance.save(function(err, savedInstance) {
|
|
if (err) {
|
|
return observer.onError(err);
|
|
}
|
|
debug('instance saved');
|
|
observer.onNext(savedInstance);
|
|
return observer.onCompleted();
|
|
});
|
|
});
|
|
}
|
|
|
|
// alias saveInstance
|
|
export const saveUser = saveInstance;
|
|
|
|
// observeQuery(Model: Object, methodName: String, query: Any) => Observable
|
|
export function observeQuery(Model, methodName, query) {
|
|
return Rx.Observable.fromNodeCallback(Model[methodName], Model)(query);
|
|
}
|
|
|
|
// observeMethod(
|
|
// context: Object, methodName: String
|
|
// ) => (query: Any) => Observable
|
|
export function observeMethod(context, methodName) {
|
|
return Rx.Observable.fromNodeCallback(context[methodName], context);
|
|
}
|
|
|
|
// must be bound to an observable instance
|
|
// timeCache(amount: Number, unit: String) => Observable
|
|
export function timeCache(time, unit) {
|
|
const source = this;
|
|
let cache;
|
|
let expireCacheAt;
|
|
return Observable.create(observable => {
|
|
// if there is no expire time set
|
|
// or if expireCacheAt is smaller than now,
|
|
// set new expire time in MS and create new subscription to source
|
|
if (!expireCacheAt || expireCacheAt < Date.now()) {
|
|
// set expire in ms;
|
|
expireCacheAt = moment().add(time, unit).valueOf();
|
|
cache = new AsyncSubject();
|
|
source.subscribe(cache);
|
|
}
|
|
return cache.subscribe(observable);
|
|
});
|
|
}
|