Skip to content

Commit

Permalink
merging in latest changes from master
Browse files Browse the repository at this point in the history
  • Loading branch information
Spencer Alger committed Feb 13, 2014
2 parents 55cd2be + 150ceae commit 86a4d92
Show file tree
Hide file tree
Showing 26 changed files with 12,363 additions and 82 deletions.
1 change: 1 addition & 0 deletions Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = function (grunt) {
src: __dirname + '/src',
app: __dirname + '/src/kibana',
unitTestDir: __dirname + '/test/unit',
testUtilsDir: __dirname + '/test/utils',
meta: {
banner: '/*! <%= package.name %> - v<%= package.version %> - ' +
'<%= grunt.template.today("yyyy-mm-dd") %>\n' +
Expand Down
3 changes: 2 additions & 1 deletion bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"lodash": "~2.4.1",
"d3": "~3.4.1",
"angular-route": "~1.2.12"
}
},
"devDependencies": {}
}
32 changes: 32 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
define(function () {
/** @scratch /configuration/config.js/1
* == Configuration
* config.js is where you will find the core Kibana configuration. This file contains parameter that
* must be set before kibana is run for the first time.
*/

/** @scratch /configuration/config.js/2
* === Parameters
*/
return {

/** @scratch /configuration/config.js/5
* ==== elasticsearch
*
* The URL to your elasticsearch server. You almost certainly don't
* want +http://localhost:9200+ here. Even if Kibana and Elasticsearch are on
* the same host. By default this will attempt to reach ES at the same host you have
* kibana installed on. You probably want to set it to the FQDN of your
* elasticsearch host
*/
elasticsearch: 'http://' + window.location.hostname + ':9200',

/** @scratch /configuration/config.js/5
* ==== kibana-int
*
* The default ES index to use for storing Kibana specific object
* such as stored dashboards
*/
kibanaIndex: 'kibana-int'
};
});
130 changes: 94 additions & 36 deletions src/courier/courier.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
define(function (require) {

var DataSource = require('courier/data_source');
var Docs = require('courier/docs');
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
Expand All @@ -14,9 +15,18 @@ define(function (require) {
};
}

function emitError(source, courier, error) {
if (EventEmitter.listenerCount(source, 'error')) {
source.emit('error', error);
} else {
courier.emit('error', error);
}
}

function mergeProp(state, filters, val, key) {
switch (key) {
case 'inherits':
case '_type':
// ignore
return;
case 'filter':
Expand Down Expand Up @@ -45,7 +55,6 @@ define(function (require) {

// all of the filters from the source chain
var filters = [];

var collectProp = _.partial(mergeProp, state, filters);

// walk the chain and merge each property
Expand Down Expand Up @@ -81,7 +90,7 @@ define(function (require) {
return state;
}

function fetch(client, sources, cb) {
function fetchSearchResults(courier, client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
Expand All @@ -90,6 +99,9 @@ define(function (require) {
var all = [];
var body = '';
_.each(sources, function (source) {
if (source.getType() !== 'search') {
return;
}
all.push(source);

var state = flattenDataSource(source);
Expand All @@ -106,13 +118,58 @@ define(function (require) {
if (err) return cb(err);

_.each(resp.responses, function (resp, i) {
sources[i].emit('results', resp);
var source = sources[i];
if (resp.error) return emitError(source, courier, resp);
source.emit('results', resp);
});

cb(err, resp);
});
}

function fetchDocs(courier, client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
}

var all = [];
var body = {
docs: []
};

_.each(sources, function (source) {
if (source.getType() !== 'get') {
return;
}

all.push(source);

var state = flattenDataSource(source);
body.docs.push({
index: state.index,
type: state.type,
id: state.id
});
});

return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);

_.each(resp.responses, function (resp, i) {
var source = sources[i];
if (resp.error) return emitError(source, courier, resp);
source.emit('results', resp);
});

cb(err, resp);
});
}

function saveUpdate(source, fields) {

}

/**
* Federated query service, supports data sources that inherit properties
* from one another and automatically emit results.
Expand All @@ -127,10 +184,13 @@ define(function (require) {
};
var fetchTimer;
var activeRequest;
var courier = this;
var sources = {
search: [],
get: []
};

var sources = [];

function doFetch() {
function doSearch() {
if (!opts.client) {
this.emit('error', new Error('Courier does not have a client, pass it ' +
'in to the constructor or set it with the .client() method'));
Expand All @@ -144,7 +204,7 @@ define(function (require) {
}

// we need to catch the original promise in order to keep it's abort method
activeRequest = fetch(opts.client, sources, function (err, resp) {
activeRequest = fetchSearchResults(courier, opts.client, sources.search, function (err, resp) {
activeRequest = null;
setFetchTimeout();

Expand All @@ -157,61 +217,59 @@ define(function (require) {
function setFetchTimeout() {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(doFetch, opts.fetchInterval);
fetchTimer = setTimeout(doSearch, opts.fetchInterval);
} else {
fetchTimer = null;
}
}

function stopFetching() {
function stopFetching(type) {
clearTimeout(fetchTimer);
}

function startFetchingSource(source) {
var existing = _.find(sources, { source: source });
if (existing) return false;

sources.push(source);
// start using a DataSource in fetches/updates
function openDataSource(source) {
var type = source.getType();
if (~sources[type].indexOf(source)) return false;
sources[type].push(source);
}

function stopFetchingSource(source) {
var i = sources.indexOf(source);
if (i !== -1) {
sources.slice(i, 1);
}
if (sources.length === 0) stopFetching();
// stop using a DataSource in fetches/updates
function closeDataSource(source) {
var type = source.getType();
var i = sources[type].indexOf(source);
if (i === -1) return;
sources[type].slice(i, 1);
// only search DataSources get fetched automatically
if (type === 'search' && sources.search.length === 0) stopFetching();
}

// is there a scheduled request?
function isStarted() {
// has the courier been started?
function isRunning() {
return !!fetchTimer;
}

// chainable public api
this.isStarted = chain(this, isStarted);
this.start = chain(this, doFetch);
this.startFetchingSource = chain(this, startFetchingSource);
this.start = chain(this, doSearch);
this.running = chain(this, isRunning);
this.stop = chain(this, stopFetching);
this.stopFetchingSource = chain(this, stopFetchingSource);
this.close = chain(this, function stopFetchingAllSources() {
_.each(sources, stopFetchingSource);
});
this.close = chain(this, function () { _(sources.search).each(closeDataSource); });
this.openDataSource = chain(this, openDataSource);
this.closeDataSource = chain(this, closeDataSource);

// setter
// setters
this.client = chain(this, function (client) {
opts.client = client;
});

// setter/getter
this.fetchInterval = function (val) {
opts.fetchInterval = val;
if (isStarted()) setFetchTimeout();
if (isRunning()) setFetchTimeout();
return this;
};

// factory
this.createSource = function (state) {
return new DataSource(this, state);
this.createSource = function (type, initialState) {
return new DataSource(this, type, initialState);
};

// apply the passed in config
Expand All @@ -220,10 +278,10 @@ define(function (require) {
this[key](val);
}, this);
}
inherits(Courier, EventEmitter);

// private api, exposed for testing
Courier._flattenDataSource = flattenDataSource;
inherits(Courier, EventEmitter);

return Courier;
});
61 changes: 43 additions & 18 deletions src/courier/data_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,30 @@ define(function (require) {
}
}

var optionNames = [
'index',
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source',
'inherits'
];
var apiMethods = {
search: [
'index',
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source',
'inherits'
],
get: [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
]
};

function DataSource(courier, initialState) {
function DataSource(courier, type, initialState) {
var state;

if (initialState) {
Expand All @@ -38,23 +47,36 @@ define(function (require) {
} else {
state = _.cloneDeep(initialState);
}
if (state._type) {
if (type && type !== state._type) {
throw new Error('Initial state is not of the type specified for this DataSource');
} else {
type = state._type;
}
}
} else {
state = {};
}

type = type || 'search';
if (!_.has(apiMethods, type)) {
throw new TypeError('Invalid DataSource type ' + type);
}
state._type = type;

var mapper = new Mapper();

var onNewListener = _.bind(function (name) {
// new newListener is emitted before it is added, count will be 0
if (name !== 'results' || listenerCount(this, 'results') !== 0) return;
courier.startFetchingSource(this);
courier.openDataSource(this);
this.removeListener('newListener', onNewListener);
this.on('removeListener', onRemoveListener);
}, this);

var onRemoveListener = _.bind(function () {
if (listenerCount(this, 'results') > 0) return;
courier.stopFetchingSource(this);
courier.closeDataSource(this);
this.removeListener('removeListener', onRemoveListener);
this.on('newListener', onNewListener);
}, this);
Expand Down Expand Up @@ -82,12 +104,15 @@ define(function (require) {
return _.keys(mapping);
});
};
this.getType = function () {
return state._type;
};
this.extend = function () {
return courier.createSource().inherits(this);
return courier.createSource(type).inherits(this);
};

// get/set internal state values
optionNames.forEach(function (name) {
apiMethods[type].forEach(function (name) {
this[name] = function (val) {
state[name] = val;
if (name === 'index' && arguments[1]) {
Expand Down
Loading

0 comments on commit 86a4d92

Please sign in to comment.