Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 65 additions & 38 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { buildMiddlewareFilter } = require('./helpers/buildMiddlewareFilter');
const stringifyFunctionOperators = require('./helpers/aggregate/stringifyFunctionOperators');
const utils = require('./utils');
const { modelSymbol } = require('./helpers/symbols');
const { getTracingChannel } = require('./helpers/diagnosticsChannel');
const read = Query.prototype.read;
const readConcern = Query.prototype.readConcern;

Expand Down Expand Up @@ -1064,58 +1065,84 @@ Aggregate.prototype.exec = async function exec() {
throw new MongooseError('Aggregate.prototype.exec() no longer accepts a callback');
}

if (this._connection) {
if (!this._pipeline.length) {
throw new MongooseError('Aggregate has empty pipeline');
const tracingChannel = getTracingChannel('mongoose:aggregate:exec');
const useTracing = tracingChannel && tracingChannel.hasSubscribers;
let tracingCtx;
if (useTracing) {
tracingCtx = {
operation: 'aggregate',
collection: (this._model && this._model.collection && this._model.collection.name) || '',
model: (this._model && this._model.modelName) || '',
pipeline: this._pipeline
};
if (tracingChannel.start) {
tracingChannel.start.publish(tracingCtx);
}
}

this._optionsForExec();
try {
if (this._connection) {
if (!this._pipeline.length) {
throw new MongooseError('Aggregate has empty pipeline');
}

const cursor = await this._connection.client.db().aggregate(this._pipeline, this.options);
return await cursor.toArray();
}
this._optionsForExec();

const model = this._model;
const collection = this._model.collection;
const cursor = await this._connection.client.db().aggregate(this._pipeline, this.options);
return await cursor.toArray();
}

applyGlobalMaxTimeMS(this.options, model.db.options, model.base.options);
applyGlobalDiskUse(this.options, model.db.options, model.base.options);
const model = this._model;
const collection = this._model.collection;

this._optionsForExec();
applyGlobalMaxTimeMS(this.options, model.db.options, model.base.options);
applyGlobalDiskUse(this.options, model.db.options, model.base.options);

if (this.options?.cursor) {
return new AggregationCursor(this);
}
this._optionsForExec();

prepareDiscriminatorPipeline(this._pipeline, this._model.schema);
stringifyFunctionOperators(this._pipeline);
if (this.options?.cursor) {
return new AggregationCursor(this);
}

const preFilter = buildMiddlewareFilter(this.options, 'pre');
const postFilter = buildMiddlewareFilter(this.options, 'post');
prepareDiscriminatorPipeline(this._pipeline, this._model.schema);
stringifyFunctionOperators(this._pipeline);

try {
await model.hooks.execPre('aggregate', this, [], { filter: preFilter });
} catch (error) {
return await model.hooks.execPost('aggregate', this, [null], { error, filter: postFilter });
}
const preFilter = buildMiddlewareFilter(this.options, 'pre');
const postFilter = buildMiddlewareFilter(this.options, 'post');

if (!this._pipeline.length) {
throw new MongooseError('Aggregate has empty pipeline');
}
try {
await model.hooks.execPre('aggregate', this, [], { filter: preFilter });
} catch (error) {
return await model.hooks.execPost('aggregate', this, [null], { error, filter: postFilter });
}

const options = clone(this.options || {});
delete options.middleware;
if (!this._pipeline.length) {
throw new MongooseError('Aggregate has empty pipeline');
}

let result;
try {
const cursor = await collection.aggregate(this._pipeline, options);
result = await cursor.toArray();
} catch (error) {
return await model.hooks.execPost('aggregate', this, [null], { error, filter: postFilter });
}
const options = clone(this.options || {});
delete options.middleware;

await model.hooks.execPost('aggregate', this, [result], { error: null, filter: postFilter });
return result;
let result;
try {
const cursor = await collection.aggregate(this._pipeline, options);
result = await cursor.toArray();
} catch (error) {
return await model.hooks.execPost('aggregate', this, [null], { error, filter: postFilter });
}

await model.hooks.execPost('aggregate', this, [result], { error: null, filter: postFilter });
return result;
} catch (err) {
if (useTracing && tracingChannel && tracingChannel.error) {
tracingChannel.error.publish(Object.assign({}, tracingCtx, { error: err }));
}
throw err;
} finally {
if (useTracing && tracingChannel && tracingChannel.asyncEnd) {
tracingChannel.asyncEnd.publish(tracingCtx);
}
}
};

/**
Expand Down
39 changes: 39 additions & 0 deletions lib/helpers/diagnosticsChannel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

/*!
* Safe access to node:diagnostics_channel tracingChannel.
* Returns null if unavailable (Node < 20, Deno, or load failure).
*/

let _tracingChannel = null;

function getTracingChannelFn() {
if (_tracingChannel != null) {
return _tracingChannel;
}
try {
const dc = require('node:diagnostics_channel');
if (typeof dc.tracingChannel === 'function') {
_tracingChannel = dc.tracingChannel;
return _tracingChannel;
}
} catch {
/* diagnostics_channel unavailable */
}
_tracingChannel = false;
return null;
}

function getTracingChannel(name) {
const fn = getTracingChannelFn();
if (!fn) {
return null;
}
try {
return fn(name);
} catch {
return null;
}
}

module.exports = { getTracingChannel };
55 changes: 52 additions & 3 deletions lib/helpers/populate/getModelsMapForPopulate.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ module.exports = function getModelsMapForPopulate(model, docs, options) {
return err;
}
}
return map;
return _finalizeMapForPerDocumentLimit(map);

function _getModelNames(doc, schemaType, modelNameFromQuery, model) {
let modelNames;
Expand Down Expand Up @@ -596,7 +596,7 @@ function _virtualPopulate(model, docs, options, _virtualRes) {
}
}

return map;
return _finalizeMapForPerDocumentLimit(map);
}

/*!
Expand Down Expand Up @@ -653,7 +653,7 @@ function addModelNamesToMap(model, map, available, modelNames, options, data, re
get(options, 'options.perDocumentLimit', null) :
options.perDocumentLimit;

if (!available[modelName] || perDocumentLimit != null) {
if (!available[modelName]) {
const currentOptions = {
model: Model
};
Expand Down Expand Up @@ -699,6 +699,55 @@ function addModelNamesToMap(model, map, available, modelNames, options, data, re
}
}

/**
* @param {Array} map
* @returns {Array}
* @private
*/
function _finalizeMapForPerDocumentLimit(map) {
const finalizedMap = [];
const len = map.length;
for (let i = 0; i < len; ++i) {
const entry = map[i];
const perDocumentLimit = entry.options.perDocumentLimit == null ?
get(entry.options, 'options.perDocumentLimit', null) :
entry.options.perDocumentLimit;

if (perDocumentLimit != null) {
const allIdsCount = utils.array.unique(utils.array.flatten(entry.allIds)).length;
const numDocs = entry.docs.length;

if (!entry.isVirtual && !entry.count && allIdsCount <= numDocs * perDocumentLimit && entry.options.skip == null && entry.options.options?.skip == null) {
// Optimization #1: Safe to fetch all in one query
entry.options.limit = perDocumentLimit;
// Important: we MUST delete perDocumentLimit so _execPopulateQuery doesn't split it again locally
delete entry.options.perDocumentLimit;
if (entry.options.options) {
delete entry.options.options.perDocumentLimit;
}
finalizedMap.push(entry);
} else {
// Fallback: split back into individual queries to maintain exact old behavior
for (let j = 0; j < entry.docs.length; ++j) {
const splitEntry = { ...entry };
splitEntry.docs = [entry.docs[j]];
splitEntry.ids = [entry.ids[j]];
splitEntry.allIds = [entry.allIds[j]];
splitEntry.unpopulatedValues = [entry.unpopulatedValues[j]];
if (Array.isArray(entry.match)) {
splitEntry.match = entry.match[j];
}
splitEntry.options = clone(entry.options);
finalizedMap.push(splitEntry);
}
}
} else {
finalizedMap.push(entry);
}
}
return finalizedMap;
}

function _getModelFromConn(conn, modelName) {
/* If this connection has a parent from `useDb()`, bubble up to parent's models */
if (conn.models[modelName] == null && conn._parent != null) {
Expand Down
Empty file.
26 changes: 24 additions & 2 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const minimize = require('./helpers/minimize');
const MongooseBulkSaveIncompleteError = require('./error/bulkSaveIncompleteError');
const ObjectExpectedError = require('./error/objectExpected');
const decorateBulkWriteResult = require('./helpers/model/decorateBulkWriteResult');
const { getTracingChannel } = require('./helpers/diagnosticsChannel');
const modelCollectionSymbol = Symbol('mongoose#Model#collection');
const modelDbSymbol = Symbol('mongoose#Model#db');
const {
Expand Down Expand Up @@ -660,19 +661,40 @@ Model.prototype.save = async function save(options) {

this.$__.saveOptions = options;

const tracingChannel = getTracingChannel('mongoose:model:save');
const useTracing = tracingChannel && tracingChannel.hasSubscribers;
let tracingCtx;
if (useTracing) {
const coll = this.constructor && this.constructor.collection;
tracingCtx = {
operation: 'save',
collection: (coll && coll.name) || '',
model: (this.constructor && this.constructor.modelName) || '',
doc: this._id != null ? { _id: this._id } : undefined
};
if (tracingChannel.start) {
tracingChannel.start.publish(tracingCtx);
}
}

try {
await this.$__save(options);
return this;
} catch (error) {
if (useTracing && tracingChannel && tracingChannel.error) {
tracingChannel.error.publish(Object.assign({}, tracingCtx, { error }));
}
this.$__handleReject(error);
throw error;
} finally {
if (useTracing && tracingChannel && tracingChannel.asyncEnd) {
tracingChannel.asyncEnd.publish(tracingCtx);
}
this.$__.saving = null;
this.$__.saveOptions = null;
this.$__.$versionError = null;
this.$op = null;
}

return this;
};

Model.prototype.$save = Model.prototype.save;
Expand Down
Loading