diff --git a/scratch.js b/scratch.js
deleted file mode 100644
index a03b37a55590dc..00000000000000
--- a/scratch.js
+++ /dev/null
@@ -1,50 +0,0 @@
-/* jshint node: true */
-var elasticsearch = require('../elasticsearch-js');
-var async = require('async');
-
-var es = new elasticsearch.Client({
- host: 'localhost:9200',
- sniffOnStart: true,
- sniffInterval: 3000,
- apiVersion: '1.0',
- log: 'trace'
-});
-
-var rl = require('readline').createInterface({
- input: process.stdin,
- output: process.stdout,
- terminal: true
-});
-
-async.series([
- function (done) {
- setTimeout(done, 50);
- },
- function (done) {
- console.log(es.transport.connectionPool._conns.index);
- es.indices.create({
- index: 'index_name'
- }, done);
- },
- function (done) {
- rl.question('Is the master down?', function () {
- done();
- });
- },
- function (done) {
- console.log(es.transport.connectionPool._conns.index);
- es.search({ index: 'index_name' }, done);
- },
- function (done) {
- rl.question('Is the slave down now?', function () {
- es.search({ body: { query: { match_all: {} } } }, done);
- });
- },
- function (done) {
- rl.question('Is the master back up?', function () {
- es.search({ body: { query: { match_all: {} } } }, done);
- });
- }
-], function (err) {
- console.log(err);
-});
\ No newline at end of file
diff --git a/src/courier/courier.js b/src/courier/courier.js
index d41269bf3e3d20..39e7d497bae385 100644
--- a/src/courier/courier.js
+++ b/src/courier/courier.js
@@ -1,287 +1,257 @@
define(function (require) {
- var DataSource = require('courier/data_source');
- var Docs = require('courier/docs');
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
var angular = require('angular');
- function chain(cntx, method) {
- return function () {
- method.apply(cntx, arguments);
- return this;
- };
- }
-
- function emitError(source, courier, error) {
- if (EventEmitter.listenerCount(source, 'error')) {
- source.emit('error', error);
- } else {
- courier.emit('error', error);
- }
- }
-
- function mergeProp(state, filters, val, key) {
- switch (key) {
- case 'inherits':
- case '_type':
- // ignore
- return;
- case 'filter':
- filters.push(val);
- return;
- case 'index':
- case 'type':
- if (key && state[key] == null) {
- state[key] = val;
- }
- return;
- case 'source':
- key = '_source';
- /* fall through */
- }
-
- if (key && state.body[key] == null) {
- state.body[key] = val;
- }
- }
-
- function flattenDataSource(source) {
- var state = {
- body: {}
- };
-
- // all of the filters from the source chain
- var filters = [];
- var collectProp = _.partial(mergeProp, state, filters);
-
- // walk the chain and merge each property
- var current = source;
- var currentState;
- while (current) {
- currentState = current._state();
- _.forOwn(currentState, collectProp);
- current = currentState.inherits;
- }
-
- // defaults for the query
- _.forOwn({
- query: {
- 'match_all': {}
+ var DocSource = require('courier/data_source/doc');
+ var SearchSource = require('courier/data_source/search');
+ var HastyRefresh = require('courier/errors').HastyRefresh;
+
+ // map constructors to type keywords
+ var sourceTypes = {
+ doc: DocSource,
+ search: SearchSource
+ };
+
+ // fetch process for the two source types
+ var onFetch = {
+ // execute a search right now
+ search: function (courier) {
+ if (courier._activeSearchRequest) {
+ return courier._error(new HastyRefresh());
}
- }, collectProp);
-
- // switch to filtered query if there are filters
- if (filters.length) {
- state.body.query = {
- filtered: {
- query: state.body.query,
- filter: {
- bool: {
- must: filters
- }
- }
+ courier._activeSearchRequest = SearchSource.fetch(
+ courier,
+ courier._refs.search,
+ function (err) {
+ if (err) return courier._error(err);
+ });
+ },
+
+ // validate that all of the DocSource objects are up to date
+ // then fetch the onces that are not
+ doc: function (courier) {
+ DocSource.validate(courier, courier._refs.doc, function (err, invalid) {
+ if (err) {
+ courier.stop();
+ return courier.emit('error', err);
}
- };
- }
- return state;
- }
+ // if all of the docs are up to date we don't need to do anything else
+ if (invalid.length === 0) return;
- function fetchSearchResults(courier, client, sources, cb) {
- if (!client) {
- this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
- return;
+ DocSource.fetch(courier, invalid, function (err) {
+ if (err) return courier._error(err);
+ });
+ });
}
+ };
- var all = [];
- var body = '';
- _.each(sources, function (source) {
- if (source.getType() !== 'search') {
- return;
- }
- all.push(source);
-
- var state = flattenDataSource(source);
- var header = JSON.stringify({
- index: state.index,
- type: state.type
- });
- var doc = JSON.stringify(state.body);
+ // default config values
+ var defaults = {
+ fetchInterval: 30000,
+ docInterval: 2500
+ };
- body += header + '\n' + doc + '\n';
- });
+ /**
+ * Federated query service, supports two data source types: doc and search.
+ *
+ * search:
+ * - inherits filters, and other query properties
+ * - automatically emit results on a set interval
+ * doc:
+ * - tracks doc versions
+ * - emits same results event when the doc is updated
+ * - helps seperate versions of kibana running on the same machine stay in sync
+ * - (NI) tracks version and uses it when new versions of a doc are reindexed
+ * - (NI) helps deal with conflicts
+ *
+ * @param {object} config
+ * @param {Client} config.client - The elasticsearch.js client to use for querying. Should be
+ * setup and ready to go.
+ * @param {EsClient} [config.client] - The elasticsearch client that the courier should use
+ * (can be set at a later time with the `.client()` method)
+ * @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult
+ * is 30 seconds)
+ */
+ function Courier(config) {
+ if (!(this instanceof Courier)) return new Courier(config);
- return client.msearch({ body: body }, function (err, resp) {
- if (err) return cb(err);
+ config = _.defaults(config || {}, defaults);
- _.each(resp.responses, function (resp, i) {
- var source = sources[i];
- if (resp.error) return emitError(source, courier, resp);
- source.emit('results', resp);
- });
+ this._client = config.client;
- cb(err, resp);
+ // array's to store references to individual sources of each type
+ // wrapped in some metadata
+ this._refs = _.transform(sourceTypes, function (refs, fn, type) {
+ refs[type] = [];
});
- }
- function fetchDocs(courier, client, sources, cb) {
- if (!client) {
- this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
- return;
- }
+ // stores all timer ids
+ this._timer = {};
- var all = [];
- var body = {
- docs: []
- };
-
- _.each(sources, function (source) {
- if (source.getType() !== 'get') {
- return;
- }
-
- all.push(source);
-
- var state = flattenDataSource(source);
- body.docs.push({
- index: state.index,
- type: state.type,
- id: state.id
- });
- });
+ // interval times for each type
+ this._interval = {};
- return client.mget({ body: body }, function (err, resp) {
- if (err) return cb(err);
+ // interval hook/fn for each type
+ this._onInterval = {};
- _.each(resp.responses, function (resp, i) {
- var source = sources[i];
- if (resp.error) return emitError(source, courier, resp);
- source.emit('results', resp);
- });
+ _.each(sourceTypes, function (fn, type) {
+ var courier = this;
+ // the name used outside of this module
+ var publicName;
+ if (type === 'search') {
+ publicName = 'fetchInterval';
+ } else {
+ publicName = type + 'Interval';
+ }
- cb(err, resp);
- });
- }
+ // store the config value passed in for this interval
+ this._interval[type] = config[publicName];
- function saveUpdate(source, fields) {
+ // store a quick "bound" method for triggering
+ this._onInterval[type] = function () {
+ onFetch[type](courier);
+ courier._schedule(type);
+ };
+ // create a public setter for this interval type
+ this[publicName] = function (val) {
+ courier._interval[type] = val;
+ courier._schedule(type);
+ return this;
+ };
+ }, this);
}
+ inherits(Courier, EventEmitter);
/**
- * Federated query service, supports data sources that inherit properties
- * from one another and automatically emit results.
- * @param {object} config
- * @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go.
- * @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds)
+ * PUBLIC API
*/
- function Courier(config) {
- if (!(this instanceof Courier)) return new Courier(config);
- var opts = {
- fetchInterval: 30000
- };
- var fetchTimer;
- var activeRequest;
- var courier = this;
- var sources = {
- search: [],
- get: []
- };
-
- function doSearch() {
- if (!opts.client) {
- this.emit('error', new Error('Courier does not have a client, pass it ' +
- 'in to the constructor or set it with the .client() method'));
- return;
- }
- if (activeRequest) {
- activeRequest.abort();
- stopFetching();
- this.emit('error', new errors.HastyRefresh());
- return;
- }
- // we need to catch the original promise in order to keep it's abort method
- activeRequest = fetchSearchResults(courier, opts.client, sources.search, function (err, resp) {
- activeRequest = null;
- setFetchTimeout();
-
- if (err) {
- window.console && console.log(err);
- }
- });
- }
-
- function setFetchTimeout() {
- clearTimeout(fetchTimer);
- if (opts.fetchInterval) {
- fetchTimer = setTimeout(doSearch, opts.fetchInterval);
- } else {
- fetchTimer = null;
- }
- }
-
- function stopFetching(type) {
- clearTimeout(fetchTimer);
- }
-
- // start using a DataSource in fetches/updates
- function openDataSource(source) {
- var type = source.getType();
- if (~sources[type].indexOf(source)) return false;
- sources[type].push(source);
+ // start fetching results on an interval
+ Courier.prototype.start = function () {
+ if (!this.running()) {
+ this._schedule('doc');
+ this._schedule('search');
+ this.fetch();
}
+ return this;
+ };
+
+ // is the courier currently running?
+ Courier.prototype.running = function () {
+ return !!this._fetchTimer;
+ };
+
+ // stop the courier from fetching more results
+ Courier.prototype.stop = function () {
+ this._clearScheduled('search');
+ this._clearScheduled('doc');
+ };
+
+ // close the courier, stopping it from refreshing and
+ // closing all of the sources
+ Courier.prototype.close = function () {
+ _.each(sourceTypes, function (fn, type) {
+ this._refs[type].forEach(function (ref) {
+ this._closeDataSource(ref.source);
+ }, this);
+ }, this);
+ };
- // stop using a DataSource in fetches/updates
- function closeDataSource(source) {
- var type = source.getType();
- var i = sources[type].indexOf(source);
- if (i === -1) return;
- sources[type].slice(i, 1);
- // only search DataSources get fetched automatically
- if (type === 'search' && sources.search.length === 0) stopFetching();
+ // force a fetch of all datasources right now
+ Courier.prototype.fetch = function () {
+ _.forOwn(onFetch, function (method, type) {
+ method(this);
+ }, this);
+ };
+
+ // data source factory
+ Courier.prototype.createSource = function (type, initialState) {
+ type = type || 'search';
+ if ('function' !== typeof sourceTypes[type]) throw new TypeError(
+ 'Invalid source type ' + type
+ );
+ var Constructor = sourceTypes[type];
+ return new Constructor(this, initialState);
+ };
+
+ /*****
+ * PRIVATE API
+ *****/
+
+ // handle errors in a standard way. The only errors that should make it here are
+ // - issues with msearch/mget syntax
+ // - unable to reach ES
+ // - HastyRefresh
+ Courier.prototype._error = function (err) {
+ this.stop();
+ return this.emit('error', err);
+ };
+
+ // every time a child object (DataSource, Mapper) needs the client, it should
+ // call _getClient
+ Courier.prototype._getClient = function () {
+ if (!this._client) throw new Error('Client is not set on the Courier yet.');
+ return this._client;
+ };
+
+ // start using a DocSource in fetches/updates
+ Courier.prototype._openDataSource = function (source) {
+ var refs = this._refs[source._getType()];
+ if (!_.find(refs, { source: source })) {
+ refs.push({
+ source: source
+ });
}
-
- // has the courier been started?
- function isRunning() {
- return !!fetchTimer;
+ };
+
+ // stop using a DataSource in fetches/updates
+ Courier.prototype._closeDataSource = function (source) {
+ var type = source._getType();
+ var refs = this._refs[type];
+ _(refs).where({ source: source }).each(_.partial(_.pull, refs));
+ if (refs.length === 0) this._clearScheduled(type);
+ };
+
+ // schedule a fetch after fetchInterval
+ Courier.prototype._schedule = function (type) {
+ this._clearScheduled(type);
+ if (this._interval[type]) {
+ this._timer[type] = setTimeout(this._onInterval[type], this._interval[type]);
}
-
- // chainable public api
- this.start = chain(this, doSearch);
- this.running = chain(this, isRunning);
- this.stop = chain(this, stopFetching);
- this.close = chain(this, function () { _(sources.search).each(closeDataSource); });
- this.openDataSource = chain(this, openDataSource);
- this.closeDataSource = chain(this, closeDataSource);
-
- // setters
- this.client = chain(this, function (client) {
- opts.client = client;
+ };
+
+ // properly clear scheduled fetches
+ Courier.prototype._clearScheduled = function (type) {
+ this._timer[type] = clearTimeout(this._timer[type]);
+ };
+
+ // alert the courior that a doc has been updated
+ // and that it should update matching docs
+ Courier.prototype._docUpdated = function (source) {
+ var updated = source._state;
+
+ _.each(this._refs.doc, function (ref) {
+ var state = ref.source._state;
+ if (
+ state === updated
+ || (
+ state.id === updated.id
+ && state.type === updated.type
+ && state.index === updated.index
+ )
+ ) {
+ delete ref.version;
+ }
});
- this.fetchInterval = function (val) {
- opts.fetchInterval = val;
- if (isRunning()) setFetchTimeout();
- return this;
- };
-
- // factory
- this.createSource = function (type, initialState) {
- return new DataSource(this, type, initialState);
- };
-
- // apply the passed in config
- _.each(config || {}, function (val, key) {
- if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
- this[key](val);
- }, this);
- }
- inherits(Courier, EventEmitter);
- // private api, exposed for testing
- Courier._flattenDataSource = flattenDataSource;
+ onFetch.doc(this);
+ };
return Courier;
});
\ No newline at end of file
diff --git a/src/courier/data_source.js b/src/courier/data_source.js
deleted file mode 100644
index a2ed3cd0211f55..00000000000000
--- a/src/courier/data_source.js
+++ /dev/null
@@ -1,128 +0,0 @@
-define(function (require) {
- var inherits = require('utils/inherits');
- var _ = require('lodash');
- var EventEmitter = require('utils/event_emitter');
- var Mapper = require('courier/mapper');
- var IndexPattern = require('courier/index_pattern');
-
- // polyfill for older versions of node
- function listenerCount(emitter, event) {
- if (EventEmitter.listenerCount) {
- return EventEmitter.listenerCount(emitter, event);
- } else {
- return this.listeners(event).length;
- }
- }
-
- var apiMethods = {
- search: [
- 'index',
- 'type',
- 'query',
- 'filter',
- 'sort',
- 'highlight',
- 'aggs',
- 'from',
- 'size',
- 'source',
- 'inherits'
- ],
- get: [
- 'index',
- 'type',
- 'id',
- 'sourceInclude',
- 'sourceExclude'
- ]
- };
-
- function DataSource(courier, type, initialState) {
- var state;
-
- if (initialState) {
- // state can be serialized as JSON, and passed back in to restore
- if (typeof initialState === 'string') {
- state = JSON.parse(initialState);
- } else {
- state = _.cloneDeep(initialState);
- }
- if (state._type) {
- if (type && type !== state._type) {
- throw new Error('Initial state is not of the type specified for this DataSource');
- } else {
- type = state._type;
- }
- }
- } else {
- state = {};
- }
-
- type = type || 'search';
- if (!_.has(apiMethods, type)) {
- throw new TypeError('Invalid DataSource type ' + type);
- }
- state._type = type;
-
- var mapper = new Mapper();
-
- var onNewListener = _.bind(function (name) {
- // new newListener is emitted before it is added, count will be 0
- if (name !== 'results' || listenerCount(this, 'results') !== 0) return;
- courier.openDataSource(this);
- this.removeListener('newListener', onNewListener);
- this.on('removeListener', onRemoveListener);
- }, this);
-
- var onRemoveListener = _.bind(function () {
- if (listenerCount(this, 'results') > 0) return;
- courier.closeDataSource(this);
- this.removeListener('removeListener', onRemoveListener);
- this.on('newListener', onNewListener);
- }, this);
-
- this.on('newListener', onNewListener);
-
- /**
- * Used to flatten a chain of DataSources
- * @return {object} - simple object containing all of the
- * sources internal state
- */
- this._state = function () {
- return state;
- };
-
- // public api
- this.toJSON = function () {
- return _.omit(state, 'inherits');
- };
- this.toString = function () {
- return JSON.stringify(this.toJSON());
- };
- this.getFieldNames = function (cb) {
- mapper.getMapping(state.index, state.type, function (mapping) {
- return _.keys(mapping);
- });
- };
- this.getType = function () {
- return state._type;
- };
- this.extend = function () {
- return courier.createSource(type).inherits(this);
- };
-
- // get/set internal state values
- apiMethods[type].forEach(function (name) {
- this[name] = function (val) {
- state[name] = val;
- if (name === 'index' && arguments[1]) {
- state.index = new IndexPattern(val, arguments[1]);
- }
- return this;
- };
- }, this);
- }
- inherits(DataSource, EventEmitter);
-
- return DataSource;
-});
\ No newline at end of file
diff --git a/src/courier/data_source/data_source.js b/src/courier/data_source/data_source.js
new file mode 100644
index 00000000000000..8f51029da50c78
--- /dev/null
+++ b/src/courier/data_source/data_source.js
@@ -0,0 +1,160 @@
+define(function (require) {
+ var inherits = require('utils/inherits');
+ var _ = require('lodash');
+ var EventEmitter = require('utils/event_emitter');
+ var Mapper = require('courier/mapper');
+ var IndexPattern = require('courier/index_pattern');
+
+ function DataSource(courier, initialState) {
+ var state;
+
+ EventEmitter.call(this);
+
+ // state can be serialized as JSON, and passed back in to restore
+ if (initialState) {
+ if (typeof initialState === 'string') {
+ state = JSON.parse(initialState);
+ } else {
+ state = _.cloneDeep(initialState);
+ }
+ } else {
+ state = {};
+ }
+
+ this._state = state;
+ this._courier = courier;
+
+ var onNewListener = _.bind(function (name) {
+ // new newListener is emitted before it is added, count will be 0
+ if (name !== 'results' || EventEmitter.listenerCount(this, 'results') !== 0) return;
+ courier._openDataSource(this);
+ this.removeListener('newListener', onNewListener);
+ this.on('removeListener', onRemoveListener);
+ }, this);
+
+ var onRemoveListener = _.bind(function () {
+ if (EventEmitter.listenerCount(this, 'results') > 0) return;
+ courier._closeDataSource(this);
+ this.removeListener('removeListener', onRemoveListener);
+ this.on('newListener', onNewListener);
+ }, this);
+
+ this.on('newListener', onNewListener);
+
+ this.extend = function () {
+ return courier.createSource(this._getType()).inherits(this);
+ };
+
+ // get/set internal state values
+ this._methods.forEach(function (name) {
+ this[name] = function (val) {
+ state[name] = val;
+ if (name === 'index' && arguments[1]) {
+ state.index = new IndexPattern(val, arguments[1]);
+ }
+ return this;
+ };
+ }, this);
+ }
+ inherits(DataSource, EventEmitter);
+
+ /*****
+ * PUBLIC API
+ *****/
+
+ /**
+ * fetch the field names for this DataSource
+ * @param {Function} cb
+ * @callback {Error, Array} - calls cb with a possible error or an array of field names
+ * @todo
+ */
+ DataSource.prototype.getFieldNames = function (cb) {
+ throw new Error('not implemented');
+ };
+
+ /**
+ * flatten an object to a simple encodable object
+ * @return {[type]} [description]
+ */
+ DataSource.prototype.toJSON = function () {
+ return _.omit(this._state, 'inherits');
+ };
+
+ /**
+ * Create a string representation of the object
+ * @return {[type]} [description]
+ */
+ DataSource.prototype.toString = function () {
+ return JSON.stringify(this.toJSON());
+ };
+
+ /*****
+ * PRIVATE API
+ *****/
+
+ /**
+ * Handle errors by allowing them to bubble from the DataSource
+ * to the courior. Maybe we should walk the inheritance chain too.
+ * @param {Error} err - The error that occured
+ * @return {undefined}
+ */
+ DataSource.prototype._error = function (err) {
+ if (EventEmitter.listenerCount(this, 'error')) {
+ this.emit('error', err);
+ } else {
+ this._courier.emit('error', err);
+ }
+ };
+
+ /**
+ * Walk the inheritance chain of a source and return it's
+ * flat representaion (taking into account merging rules)
+ * @return {object} - the flat state of the DataSource
+ */
+ DataSource.prototype._flatten = function () {
+ var type = this._getType();
+ // the merged state of this dataSource and it's ancestors
+ var flatState = {};
+
+ var collectProp = _.partial(this._mergeProp, flatState);
+
+ // walk the chain and merge each property
+ var current = this;
+ var currentState;
+ while (current) {
+ currentState = current._state;
+ _.forOwn(currentState, collectProp);
+ current = currentState.inherits;
+ }
+
+ if (type === 'search') {
+ // defaults for the query
+ _.forOwn({
+ query: {
+ 'match_all': {}
+ }
+ }, collectProp);
+
+ // switch to filtered query if there are filters
+ if (flatState.filters) {
+ if (flatState.filters.length) {
+ flatState.body.query = {
+ filtered: {
+ query: flatState.body.query,
+ filter: {
+ bool: {
+ must: flatState.filters
+ }
+ }
+ }
+ };
+ }
+ delete flatState.filters;
+ }
+ }
+
+ return flatState;
+ };
+
+ return DataSource;
+});
\ No newline at end of file
diff --git a/src/courier/data_source/doc.js b/src/courier/data_source/doc.js
new file mode 100644
index 00000000000000..4bdfe3d4d4935c
--- /dev/null
+++ b/src/courier/data_source/doc.js
@@ -0,0 +1,181 @@
+define(function (require) {
+ var DataSource = require('courier/data_source/data_source');
+ var inherits = require('utils/inherits');
+ var errors = require('courier/errors');
+ var _ = require('lodash');
+
+ function DocSource(courier, initialState) {
+ DataSource.call(this, courier, initialState);
+ }
+ inherits(DocSource, DataSource);
+
+ /**
+ * Method used by the Courier to fetch multiple DocSource objects at one time.
+ * Those objects come in via the refs array, which is a list of objects containing
+ * at least `source` and `version` keys.
+ *
+ * @param {Courier} courier - The courier requesting the records
+ * @param {array} refs - The list of refs
+ * @param {Function} cb - Callback
+ * @return {undefined}
+ */
+ DocSource.fetch = function (courier, refs, cb) {
+ var client = courier._getClient();
+ var allRefs = [];
+ var body = {
+ docs: []
+ };
+
+ _.each(refs, function (ref) {
+ var source = ref.source;
+ if (source._getType() !== 'doc') return;
+
+ allRefs.push(ref);
+ body.docs.push(source._flatten());
+ });
+
+ return client.mget({ body: body }, function (err, resp) {
+ if (err) return cb(err);
+
+ _.each(resp.docs, function (resp, i) {
+ var ref = allRefs[i];
+ var source = ref.source;
+
+ if (resp.error) return this._error(resp);
+ if (ref.version === resp._version) return; // no change
+ ref.version = resp._version;
+ source._storeVersion(resp._version);
+ source.emit('results', resp);
+ });
+
+ cb(err, resp);
+ });
+ };
+
+ /**
+ * Method used to check if a list of refs is
+ * @param {[type]} courier [description]
+ * @param {[type]} refs [description]
+ * @param {Function} cb [description]
+ * @return {[type]} [description]
+ */
+ DocSource.validate = function (courier, refs, cb) {
+ var invalid = _.filter(refs, function (ref) {
+ var storedVersion = ref.source._getStoredVersion();
+ if (ref.version !== storedVersion) return true;
+ });
+ setTimeout(function () {
+ cb(void 0, invalid);
+ });
+ };
+
+ /*****
+ * PUBLIC API
+ *****/
+
+ /**
+ * List of methods that is turned into a chainable API in the constructor
+ * @type {Array}
+ */
+ DocSource.prototype._methods = [
+ 'index',
+ 'type',
+ 'id',
+ 'sourceInclude',
+ 'sourceExclude'
+ ];
+
+ /**
+ * Applies a partial update to the document
+ * @param {object} fields - The fields to change and their new values (es doc field)
+ * @param {Function} cb - Callback to know when the update is complete
+ * @return {undefined}
+ */
+ DocSource.prototype.update = function (fields, cb) {
+ var source = this;
+ var courier = this._courier;
+ var client = courier._getClient();
+ var state = this._state;
+
+ client.update({
+ id: state.id,
+ type: state.type,
+ index: state.index,
+ body: {
+ doc: fields
+ }
+ }, function (err, resp) {
+ if (err) return cb(err);
+
+ courier._docUpdated(source);
+ return cb();
+ });
+ };
+
+ /*****
+ * PRIVATE API
+ *****/
+
+ /**
+ * Get the type of this DataSource
+ * @return {string} - 'doc'
+ */
+ DocSource.prototype._getType = function () {
+ return 'doc';
+ };
+
+ /**
+ * Used to merge properties into the state within ._flatten().
+ * The state is passed in and modified by the function
+ *
+ * @param {object} state - the current merged state
+ * @param {*} val - the value at `key`
+ * @param {*} key - The key of `val`
+ * @return {undefined}
+ */
+ DocSource.prototype._mergeProp = function (state, val, key) {
+ key = '_' + key;
+
+ if (val != null && state[key] == null) {
+ state[key] = val;
+ }
+ };
+
+ /**
+ * Creates a key based on the doc's index/type/id
+ * @return {string}
+ */
+ DocSource.prototype._versionKey = function () {
+ var state = this._state;
+ return 'DocVersion:' + (
+ [
+ state.index,
+ state.type,
+ state.id
+ ]
+ .map(encodeURIComponent)
+ .join('/')
+ );
+ };
+
+ /**
+ * Fetches the stored version from localStorage
+ * @return {number} - the version number, or NaN
+ */
+ DocSource.prototype._getStoredVersion = function () {
+ var id = this._versionKey();
+ return _.parseInt(localStorage.getItem(id));
+ };
+
+ /**
+ * Stores the version into localStorage
+ * @param {number, NaN} version - the current version number, NaN works well forcing a refresh
+ * @return {undefined}
+ */
+ DocSource.prototype._storeVersion = function (version) {
+ var id = this._versionKey();
+ localStorage.setItem(id, version);
+ };
+
+ return DocSource;
+});
\ No newline at end of file
diff --git a/src/courier/data_source/search.js b/src/courier/data_source/search.js
new file mode 100644
index 00000000000000..ff7f4372213cf4
--- /dev/null
+++ b/src/courier/data_source/search.js
@@ -0,0 +1,129 @@
+define(function (require) {
+ var DataSource = require('courier/data_source/data_source');
+ var inherits = require('utils/inherits');
+ var errors = require('courier/errors');
+ var _ = require('lodash');
+
+ function SearchSource(courier, initialState) {
+ DataSource.call(this, courier, initialState);
+ }
+ inherits(SearchSource, DataSource);
+
+ /**
+ * Method used by the Courier to fetch multiple SearchSource request at a time.
+ * Those objects come in via the refs array, which is a list of objects containing
+ * a `source` keys.
+ *
+ * @param {Courier} courier - The courier requesting the results
+ * @param {array} refs - The list of refs
+ * @param {Function} cb - Callback
+ * @return {undefined}
+ */
+ SearchSource.fetch = function (courier, refs, cb) {
+ var client = courier._getClient();
+ var allRefs = [];
+ var body = '';
+
+ _.each(refs, function (ref) {
+ var source = ref.source;
+ if (source._getType() !== 'search') {
+ return;
+ }
+ allRefs.push(source);
+
+ var state = source._flatten();
+ body +=
+ JSON.stringify({ index: state.index, type: state.type })
+ + '\n'
+ + JSON.stringify(state.body)
+ + '\n';
+ });
+
+ return client.msearch({ body: body }, function (err, resp) {
+ if (err) return cb(err);
+
+ _.each(resp.responses, function (resp, i) {
+ var source = allRefs[i];
+ if (resp.error) return errors.emit(source, courier, resp);
+ source.emit('results', resp);
+ });
+
+ cb(void 0, resp);
+ });
+ };
+
+ /*****
+ * PUBLIC API
+ *****/
+
+ /**
+ * List of the editable state properties that turn into a
+ * chainable API
+ *
+ * @type {Array}
+ */
+ SearchSource.prototype._methods = [
+ 'index',
+ 'type',
+ 'query',
+ 'filter',
+ 'sort',
+ 'highlight',
+ 'aggs',
+ 'from',
+ 'size',
+ 'source',
+ 'inherits'
+ ];
+
+ /******
+ * PRIVATE APIS
+ ******/
+
+ /**
+ * Gets the type of the DataSource
+ * @return {string}
+ */
+ SearchSource.prototype._getType = function () {
+ return 'search';
+ };
+
+ /**
+ * Used to merge properties into the state within ._flatten().
+ * The state is passed in and modified by the function
+ *
+ * @param {object} state - the current merged state
+ * @param {*} val - the value at `key`
+ * @param {*} key - The key of `val`
+ * @return {undefined}
+ */
+ SearchSource.prototype._mergeProp = function (state, val, key) {
+ switch (key) {
+ case 'inherits':
+ case '_type':
+ // ignore
+ return;
+ case 'filter':
+ state.filters = state.filters || [];
+ state.filters.push(val);
+ return;
+ case 'index':
+ case 'type':
+ case 'id':
+ if (key && state[key] == null) {
+ state[key] = val;
+ }
+ return;
+ case 'source':
+ key = '_source';
+ /* fall through */
+ default:
+ state.body = state.body || {};
+ if (key && state.body[key] == null) {
+ state.body[key] = val;
+ }
+ }
+ };
+
+ return SearchSource;
+});
\ No newline at end of file
diff --git a/src/courier/docs.js b/src/courier/docs.js
deleted file mode 100644
index 06b7424c7faaeb..00000000000000
--- a/src/courier/docs.js
+++ /dev/null
@@ -1,135 +0,0 @@
-define(function (require) {
- var _ = require('lodash');
-
- function Docs(courier) {
- // docs that we have let loose, and want to track
- var tracking = {};
- var watchers = {};
-
- function respId(getResp) {
- return [
- encodeURIComponent(getResp._index),
- encodeURIComponent(getResp._type),
- encodeURIComponent(getResp._id)
- ].join('/');
- }
-
- function change(id, updated) {
- if (watchers[id]) {
- var notify = function () {
- var oldVal = tracking[id]._source;
- tracking[id] = _.cloneDeep(update);
- watchers[id].forEach(function (watcher) {
- try {
- watcher(updated, oldVal);
- } catch (e) { console.error(e); }
- });
- };
-
- if (updated) {
- notify();
- } else {
- courier.get('client').get({
-
- });
- }
- }
- }
-
- function track(resp) {
- var id = respId(resp);
- var tracker = _.pick(resp, '_id', '_type', '_index', '_source');
- if (tracking[id] && equal(tracking[id]._source, resp)) return false;
- change(id, resp);
- }
-
- /**
- * add a function to be called when objects matching
- * this resp are changed
- * @param {object} resp - Response like object, should contain _id, _type, and _index keys
- * @param {[type]} onChange - Function to be called when changes are noticed
- */
- function watch(resp, onChange) {
- var id = respId(resp);
- if (!watchers[id]) watchers[id] = [];
- watchers[id].push(onChange);
- }
-
- function get(args, cb, onChange) {
- var client = courier.get('client');
- client.get(args, function (err, getResp) {
- if (err) return cb(err);
- watch(getResp, onChange);
- return cb(void 0, getResp);
- });
- }
-
- function index(args, cb) {
- var client = courier.get('client');
-
- client.index(args, function (err, indexResp) {
- if (err) return cb(err);
- delete indexResp.created;
- indexResp._source = args.body;
- track(indexResp);
- return cb(void 0, indexResp);
- });
- }
-
- function update(args, cb) {
- var client = courier.get('client');
- client.update(args, function (err, updateResp) {
- if (err) return cb(err);
- return cb(void 0, updateResp);
- });
- }
-
- this.watch = watch;
- this.get = get;
- this.index = index;
- this.set = index;
- this.update = update;
- }
-
- function equal(o1, o2) {
- /* jshint eqeqeq:false, forin:false */
- if (o1 === o2) return true;
- if (o1 === null || o2 === null) return false;
- if (o1 !== o1 && o2 !== o2) return true; // NaN === NaN
- var t1 = typeof o1, t2 = typeof o2, length, key, keySet;
- if (t1 == t2) {
- if (t1 == 'object') {
- if (_.isArray(o1)) {
- if (!_.isArray(o2)) return false;
- if ((length = o1.length) == o2.length) {
- for (key = 0; key < length; key++) {
- if (!equal(o1[key], o2[key])) return false;
- }
- return true;
- }
- } else if (_.isDate(o1)) {
- return _.isDate(o2) && o1.getTime() == o2.getTime();
- } else if (_.isRegExp(o1) && _.isRegExp(o2)) {
- return o1.toString() == o2.toString();
- } else {
- if (_.isArray(o2)) return false;
- keySet = {};
- for (key in o1) {
- if (_.isFunction(o1[key])) continue;
- if (!equal(o1[key], o2[key])) return false;
- keySet[key] = true;
- }
- for (key in o2) {
- if (!keySet.hasOwnProperty(key) &&
- o2[key] !== undefined &&
- !_.isFunction(o2[key])) return false;
- }
- return true;
- }
- }
- }
- return false;
- }
-
- return Docs;
-});
\ No newline at end of file
diff --git a/src/courier/errors.js b/src/courier/errors.js
index 1afdd8bc00378c..a12fc3263367c0 100644
--- a/src/courier/errors.js
+++ b/src/courier/errors.js
@@ -1,12 +1,12 @@
-define(function (require) {
+define(function (require, module, exports) {
+ var listenerCount = require('utils/event_emitter').listenerCount;
+
+ // caused by a refresh attempting to start before the prevous is done
function HastyRefresh() {
this.name = 'HastyRefresh';
this.message = 'Courier attempted to start a query before the previous had finished.';
}
HastyRefresh.prototype = new Error();
HastyRefresh.prototype.constructor = HastyRefresh;
-
- return {
- HastyRefresh: HastyRefresh
- };
+ exports.HastyRefresh = HastyRefresh;
});
\ No newline at end of file
diff --git a/src/courier/scratch.js b/src/courier/scratch.js
deleted file mode 100644
index 4401e5b16cbfaf..00000000000000
--- a/src/courier/scratch.js
+++ /dev/null
@@ -1,16 +0,0 @@
-var elasticsearch = require('elasticsearch');
-var es = elasticsearch.Client();
-
-es.msearch({
- body: [
- {
- index: 'logstash-2014.02.1111'
- },
- {
- query: { 'match_all': {} }
- }
- ]
-}, function (err, resp) {
- console.log(resp);
- es.close();
-});
\ No newline at end of file
diff --git a/src/courier/test.html b/src/courier/test.html
index c5deb11bb6e37e..0b542c9dcc1168 100644
--- a/src/courier/test.html
+++ b/src/courier/test.html
@@ -1,3 +1,3 @@
-
{{json}}', controller: function ($rootScope, $scope) { + $scope.count = 0; + var source = $rootScope.dataSource.extend() .type($scope.type) .source({ include: 'country' }) .on('results', function (resp) { + $scope.count ++; $scope.json = JSON.stringify(resp.hits, null, ' '); }); $scope.$watch('type', source.type); + } + }; + }) + .directive('courierDocTest', function () { + return { + restrict: 'E', + scope: { + id: '@', + type: '@', + index: '@' }, - template: '
{{json}}' + template: '{{count}} : :
{{json}}', + controller: function (courier, $scope) { + $scope.count = 0; + + var currentSource; + $scope.click = function () { + if (currentSource) { + source.update(currentSource); + } + }; + + var source = courier.createSource('doc') + .id($scope.id) + .type($scope.type) + .index($scope.index) + .on('results', function (doc) { + currentSource = doc._source; + $scope.count ++; + $scope.json = JSON.stringify(doc, null, ' '); + }); + } }; }); }); \ No newline at end of file diff --git a/src/kibana/services/courier.js b/src/kibana/services/courier.js index 50c4aaeb7d2eb7..38675588624fbc 100644 --- a/src/kibana/services/courier.js +++ b/src/kibana/services/courier.js @@ -1,16 +1,22 @@ define(function (require) { var angular = require('angular'); var Courier = require('courier/courier'); + var DocSource = require('courier/data_source/doc'); + + require('services/promises'); angular.module('kibana/services') - .service('courier', function (es) { + .service('courier', function (es, promises) { + + promises.playNice(DocSource.prototype, [ + 'update', + 'index' + ]); + var courier = new Courier({ fetchInterval: 15000, - client: es - }); - - courier.on('error', function (err) { - console.error(err); + client: es, + promises: promises }); return courier; diff --git a/src/kibana/services/promises.js b/src/kibana/services/promises.js new file mode 100644 index 00000000000000..a41c8280a3a015 --- /dev/null +++ b/src/kibana/services/promises.js @@ -0,0 +1,39 @@ +define(function (require, module, exports) { + var _ = require('lodash'); + var angular = require('angular'); + + angular.module('kibana/services') + .service('promises', function ($q) { + + function playNice(fn, fns) { + if (fns && _.isArray(fns) && _.isObject(fn)) { + fns.forEach(function (method) { + fn[method] = playNice(fn[method]); + }); + return fn; + } + + return function playNiceWrapper() { + // if the last arg is a callback then don't do anything + if (typeof arguments[arguments.length - 1] === 'function') { + return fn.apply(this, arguments); + } + + // otherwise create a callback and pass it in + var args = Array.prototype.slice.call(arguments); + var defer = $q.defer(); + args.push(function (err, result) { + if (err) return defer.reject(err); + defer.resolve(result); + }); + fn.apply(this, args); + return defer.promise; + }; + } + + + return { + playNice: playNice + }; + }); +}); \ No newline at end of file diff --git a/test/unit/specs/courier.js b/test/unit/specs/courier.js index 9e3d3e729e4753..4b755e3c2da419 100644 --- a/test/unit/specs/courier.js +++ b/test/unit/specs/courier.js @@ -2,6 +2,9 @@ define(function (require) { var Courier = require('courier/courier'); var _ = require('lodash'); var sinon = require('sinon/sinon'); + var DataSource = require('courier/data_source/data_source'); + var DocSource = require('courier/data_source/doc'); + var SearchSource = require('courier/data_source/search'); describe('Courier Module', function () { @@ -39,13 +42,13 @@ define(function (require) { it('creates an empty search DataSource object', function () { courier = new Courier(); var source = courier.createSource(); - expect(source._state()).to.eql({ _type: 'search' }); + expect(source._state).to.eql({}); }); it('optionally accepts a type for the DataSource', function () { var courier = new Courier(); - expect(courier.createSource()._state()._type).to.eql('search'); - expect(courier.createSource('search')._state()._type).to.eql('search'); - expect(courier.createSource('get')._state()._type).to.eql('get'); + expect(courier.createSource()).to.be.a(SearchSource); + expect(courier.createSource('search')).to.be.a(SearchSource); + expect(courier.createSource('doc')).to.be.a(DocSource); expect(function () { courier.createSource('invalid type'); }).to.throwError(TypeError); @@ -53,12 +56,12 @@ define(function (require) { it('optionally accepts a json object/string that will populate the DataSource object with settings', function () { courier = new Courier(); var savedState = JSON.stringify({ - _type: 'get', + _type: 'doc', index: 'logstash-[YYYY-MM-DD]', type: 'nginx', id: '1' }); - var source = courier.createSource('get', savedState); + var source = courier.createSource('doc', savedState); expect(source + '').to.eql(savedState); }); }); @@ -83,7 +86,7 @@ define(function (require) { source.on('results', _.noop); source.index('the index name'); - expect(Courier._flattenDataSource(source).index).to.eql('the index name'); + expect(source._flatten().index).to.eql('the index name'); }); }); @@ -111,7 +114,7 @@ define(function (require) { }) .on('results', _.noop); - var query = Courier._flattenDataSource(math); + var query = math._flatten(); expect(query.index).to.eql('people'); expect(query.type).to.eql('students'); expect(query.body).to.eql({ diff --git a/test/unit/specs/data_source.js b/test/unit/specs/data_source.js index 6dc9ee108b7405..f3a717a684f661 100644 --- a/test/unit/specs/data_source.js +++ b/test/unit/specs/data_source.js @@ -1,30 +1,20 @@ define(function (require) { var Courier = require('courier/courier'); - var DataSource = require('courier/data_source'); + var DataSource = require('courier/data_source/data_source'); + var DocSource = require('courier/data_source/doc'); + var SearchSource = require('courier/data_source/search'); describe('DataSource class', function () { var courier = new Courier(); describe('::new', function () { - it('accepts and validates a type', function () { - var source = new DataSource(courier, 'get'); - expect(source._state()._type).to.eql('get'); - - source = new DataSource(courier, 'search'); - expect(source._state()._type).to.eql('search'); - - expect(function () { - source = new DataSource(courier, 'invalid Type'); - }).to.throwError(TypeError); - }); - it('optionally accepts a json object/string that will populate the DataSource object with settings', function () { var savedState = JSON.stringify({ - _type: 'get', + _type: 'doc', index: 'logstash-[YYYY-MM-DD]', type: 'nginx', id: '1' }); - var source = new DataSource(courier, 'get', savedState); + var source = new DocSource(courier, savedState); expect(source + '').to.eql(savedState); }); });