freeCodeCamp/server/utils/rx.js

59 lines
1.8 KiB
JavaScript
Raw Normal View History

2016-04-05 21:45:09 +00:00
import Rx, { AsyncSubject, Observable } from 'rx';
import moment from 'moment';
2015-10-02 18:47:36 +00:00
import debugFactory from 'debug';
2015-06-21 02:55:22 +00:00
2016-01-27 19:34:44 +00:00
const debug = debugFactory('fcc:rxUtils');
2015-10-02 18:47:36 +00:00
export function saveInstance(instance) {
2015-06-21 02:55:22 +00:00
return new Rx.Observable.create(function(observer) {
2015-06-25 22:03:46 +00:00
if (!instance || typeof instance.save !== 'function') {
debug('no instance or save method');
2015-06-21 02:55:22 +00:00
observer.onNext();
return observer.onCompleted();
}
2016-03-03 04:54:14 +00:00
return instance.save(function(err, savedInstance) {
2015-06-21 02:55:22 +00:00
if (err) {
return observer.onError(err);
}
2015-06-25 22:03:46 +00:00
debug('instance saved');
observer.onNext(savedInstance);
2016-03-03 04:54:14 +00:00
return observer.onCompleted();
2015-06-21 02:55:22 +00:00
});
});
2015-10-02 18:47:36 +00:00
}
2015-06-21 02:55:22 +00:00
2015-06-25 22:03:46 +00:00
// alias saveInstance
2015-10-02 18:47:36 +00:00
export const saveUser = saveInstance;
2015-06-25 22:03:46 +00:00
// observeQuery(Model: Object, methodName: String, query: Any) => Observable
export function observeQuery(Model, methodName, query) {
return Rx.Observable.fromNodeCallback(Model[methodName], Model)(query);
2015-10-02 18:47:36 +00:00
}
2015-06-23 02:23:07 +00:00
// observeMethod(
// context: Object, methodName: String
// ) => (query: Any) => Observable
2015-10-02 18:47:36 +00:00
export function observeMethod(context, methodName) {
return Rx.Observable.fromNodeCallback(context[methodName], context);
2015-10-02 18:47:36 +00:00
}
2016-04-05 21:45:09 +00:00
// must be bound to an observable instance
// timeCache(amount: Number, unit: String) => Observable
2016-04-05 21:45:09 +00:00
export function timeCache(time, unit) {
const source = this;
let cache;
let expireCacheAt;
return Observable.create(observable => {
// if there is no expire time set
// or if expireCacheAt is smaller than now,
// set new expire time in MS and create new subscription to source
if (!expireCacheAt || expireCacheAt < Date.now()) {
// set expire in ms;
expireCacheAt = moment().add(time, unit).valueOf();
cache = new AsyncSubject();
source.subscribe(cache);
}
return cache.subscribe(observable);
});
}