save users into new `user` collection

pull/18182/head
Berkeley Martinez 2015-06-07 22:06:57 -07:00
parent 38931dd23d
commit 0e240ba630
1 changed files with 23 additions and 8 deletions

View File

@ -18,10 +18,10 @@ function createConnection(URI) {
});
}
function createQuery(db, collection, selection, options, batchSize) {
function createQuery(db, collection, options, batchSize) {
return Rx.Observable.create(function (observer) {
console.log('Creating cursor...');
var cursor = db.collection(collection).find(selection, options);
var cursor = db.collection(collection).find({}, options);
cursor.batchSize(batchSize || 20);
// Cursor.each will yield all doc from a batch in the same tick,
// or schedule getting next batch on nextTick
@ -42,9 +42,9 @@ function createQuery(db, collection, selection, options, batchSize) {
});
}
function saveUser(user) {
function insertMany(db, collection, users, options) {
return Rx.Observable.create(function(observer) {
user.save(function(err) {
db.collection(collection).insertMany(users, options, function(err) {
if (err) {
return observer.onError(err);
}
@ -54,21 +54,36 @@ function saveUser(user) {
}
var count = 0;
createConnection(secrets.db)
// will supply our db object
var dbObservable = createConnection(secrets.db).shareReplay();
dbObservable
.flatMap(function(db) {
// returns user document, n users per loop where n is the batchsize.
return createQuery(db, 'users', {});
})
.map(function(user) {
// flatten user
assign(user, user.portfolio, user.profile);
return user;
})
.flatMap(function(user) {
return saveUser(user);
// batch them into arrays of twenty documents
.bufferWithCount(20)
// get bd object ready for insert
.withLatestFrom(dbObservable, function(users, db) {
return {
users: users,
db: db
};
})
.flatMap(function(dats) {
// bulk insert into new collection for loopback
return insertMany(dats.db, 'user', dats.users, { w: 1 });
})
// count how many times insert completes
.count()
.subscribe(
function(_count) {
count = _count;
count = _count * 20;
},
function(err) {
console.log('an error occured', err);