samtecspg / conveyor Goto Github PK
View Code? Open in Web Editor NEWEasy way to get structured stuff into Elasticsearch (CSV, MSSQL, API)
License: Apache License 2.0
Easy way to get structured stuff into Elasticsearch (CSV, MSSQL, API)
License: Apache License 2.0
@malave can you write a short developers doc. Primarily trying to answer a few questions:
For the moment you can add it onto development.md
The Kibana plugin helper is not minifying the build files as it operates in developer mode.
Re-thinking this task a bit, we need to design a better way to store parameters than just a key, value array. The way we store parameters needs to allow:
Goal being that all communication should go through the Ingest API for channel creation. Part of channel creation could be defining a mapping for the index.
Issue: Currently the the ES index and documents (for flow and flowTemplate) are been created dynamically once we create the first documents.
The flowTemplate causes errors when the channel-sources
container runs since we have documents with different structures, in this case the parameter attributes in the flowTemplate could have a different structure if this is a list.
Solution 1: as the tittle says, initialize the mapping when the API is first run.
Solution 2: Store data in a different and simpler way (some memory DB that dump to disk regularly that we can embed or something else) . The API (the hapijs section) doesn't really need to access ES since it's only function is to save and retrieve flow and flowTemplate, and small additional advantage would be not waiting for ES to finish indexing some data that we just modified before we can query it again.
More to come
As of Aug 1 ingest flows are hard-coded, and there is no UI based way to create new ones.
A dynamic system needs to be developed.
After much pulling of hair (and searching excellence by @wrathagom) the following background was found:
Some goals of the UI:
Re-thinking this task a bit, we need to design a better way to store parameters than just a key, value array. The way we store parameters needs to allow:
When the help text is too long it moves the subsequent group elements down the page instead of overflowing.
Luis mentioned redesigning this to be float above the grid as opposed to be a part of the grid.
The name should be unique and when a POST is done with the same name it should update the old one and generate a new version number
Since we moved away from stringifying the flow before uploading I can no longer use boolean parameter types. I have to put the templating syntax in a string to be valid JSON:
{
"id": "{{_id}}-18",
"type": "http request",
"name": "",
"method": "POST",
"ret": "{{PARAMETER}}",
"url": "elasticsearch:9200/{{_id}}/_delete_by_query",
"tls": "",
"x": 630,
"y": 140,
"wires": [
[
"{{_id}}-19"
]
]
}
But this fails because "false" doesn't equal false :) We either need to move back to stringifying. Which was always problematic for me, but I don't remember why. Or figure out a new solution.
So this would be a new parameter type though it would behave a little bit differently. Rather than exposing fields to the flow it would POST the content of the file to /{{_url}}/data
after the flow had been created.
let me know if there is more architecting needed.
Create a tempalte, but one that wont work, here I forgot to include the label and nodex upper level object:
curl --request POST \
--url http://localhost:4000/flowTemplate \
--header 'content-type: application/json' \
--data '{\n "name": "csv-upload",\n "description": "A channel template that allows uploading a CSV file to populate ES with data. A unique field can be specified allowing future uploads to update rows.",\n "parameters": ["channelName", "channelEndpoint", "uniqueField"],\n "flow": "[{\"type\":\"tab\",\"label\":\"{{channelName}}\"},{\"id\":\"{{channelName}}-1-{{_id}}\",\"type\":\"http in\",\"name\":\"\",\"url\":\"/flow/{{channelEndpoint}}\",\"method\":\"post\",\"swaggerDoc\":\"\",\"x\":130,\"y\":100,\"wires\":[[\"{{channelName}}-3-{{_id}}\"]]},{\"id\":\"{{channelName}}-2-{{_id}}\",\"type\":\"function\",\"name\":\"Split Data\",\"func\":\"var newMsg = {\\n payload: msg.payload.fileInfo\\n}\\nmsg.payload = msg.payload.data;\\n\\nreturn [newMsg, msg];\",\"outputs\":\"2\",\"noerr\":0,\"x\":700,\"y\":100,\"wires\":[[\"{{channelName}}-4-{{_id}}\"],[\"{{channelName}}-5-{{_id}}\"]]},{\"id\":\"{{channelName}}-4-{{_id}}\",\"type\":\"debug\",\"name\":\"\",\"active\":true,\"console\":\"false\",\"complete\":\"false\",\"x\":870,\"y\":60,\"wires\":[]},{\"id\":\"{{channelName}}-6-{{_id}}\",\"type\":\"comment\",\"name\":\"Future Enhancement\",\"info\":\"In the future I want to wite back the most recent file info to ES\",\"x\":900,\"y\":20,\"wires\":[]},{\"id\":\"{{channelName}}-3-{{_id}}\",\"type\":\"function\",\"name\":\"Config Check\",\"func\":\"var requiredConfigs = ['\''ES_AUTH'\'', '\''ES_HOST'\'', '\''ES_PORT'\''];\\nvar missing = false\\nvar missingConfigs = [];\\n\\nfor (var i=0; i<requiredConfigs.length; i++) {\\n if (!global.get(requiredConfigs[i])) {\\n missing = true;\\n missingConfigs.push(requiredConfigs[i]);\\n }\\n}\\n\\nif (missing === false) {\\n return [msg, null]\\n} else {\\n msg.payload = {\\n error: '\''Missing a required global configuration.'\'',\\n missingConfigurations: missingConfigs\\n }\\n \\n return [null, msg]\\n}\",\"outputs\":\"2\",\"noerr\":0,\"x\":340,\"y\":100,\"wires\":[[\"{{channelName}}-7-{{_id}}\"],[\"{{channelName}}-8-{{_id}}\"]]},{\"id\":\"{{channelName}}-7-{{_id}}\",\"type\":\"function\",\"name\":\"Data Check\",\"func\":\"if (msg.payload.data && msg.payload.data.length > 0) {\\n return [msg, null]\\n} else {\\n msg.payload = {\\n error: '\''No Data'\''\\n }\\n return [null, msg]\\n}\",\"outputs\":\"2\",\"noerr\":0,\"x\":530,\"y\":100,\"wires\":[[\"{{channelName}}-2-{{_id}}\"],[\"{{channelName}}-9-{{_id}}\"]]},{\"id\":\"{{channelName}}-8-{{_id}}\",\"type\":\"link out\",\"name\":\"Response\",\"links\":[\"{{channelName}}-10-{{_id}}\"],\"x\":475,\"y\":140,\"wires\":[]},{\"id\":\"{{channelName}}-9-{{_id}}\",\"type\":\"link out\",\"name\":\"Response\",\"links\":[\"{{channelName}}-10-{{_id}}\"],\"x\":655,\"y\":140,\"wires\":[]},{\"id\":\"{{channelName}}-10-{{_id}}\",\"type\":\"link in\",\"name\":\"Response Gathering\",\"links\":[\"{{channelName}}-8-{{_id}}\",\"{{channelName}}-9-{{_id}}\",\"{{channelName}}-11-{{_id}}\",\"{{channelName}}-12-{{_id}}\",\"{{channelName}}-13-{{_id}}\"],\"x\":55,\"y\":320,\"wires\":[[\"{{channelName}}-14-{{_id}}\"]]},{\"id\":\"{{channelName}}-14-{{_id}}\",\"type\":\"http response\",\"name\":\"\",\"x\":150,\"y\":320,\"wires\":[]},{\"id\":\"{{channelName}}-15-{{_id}}\",\"type\":\"function\",\"name\":\"Validate ID\",\"func\":\"if (msg.payload.{{uniqueField}} && msg.payload.{{uniqueField}} !== '\'''\'') {\\n return [msg, null]\\n} else {\\n msg.payload = {\\n error: '\''Unique Field not found'\''\\n }\\n return [null, msg]\\n}\",\"outputs\":\"2\",\"noerr\":0,\"x\":1009,\"y\":123,\"wires\":[[\"{{channelName}}-16-{{_id}}\"],[\"{{channelName}}-11-{{_id}}\"]]},{\"id\":\"{{channelName}}-11-{{_id}}\",\"type\":\"link out\",\"name\":\"Response\",\"links\":[\"{{channelName}}-10-{{_id}}\"],\"x\":1135,\"y\":180,\"wires\":[]},{\"id\":\"{{channelName}}-16-{{_id}}\",\"type\":\"link out\",\"name\":\"\",\"links\":[\"{{channelName}}-17-{{_id}}\"],\"x\":1135,\"y\":100,\"wires\":[]},{\"id\":\"{{channelName}}-17-{{_id}}\",\"type\":\"link in\",\"name\":\"\",\"links\":[\"{{channelName}}-16-{{_id}}\"],\"x\":55,\"y\":200,\"wires\":[[\"{{channelName}}-18-{{_id}}\"]]},{\"id\":\"{{channelName}}-18-{{_id}}\",\"type\":\"function\",\"name\":\"Build Query\",\"func\":\"// var oldMessage = msg;\\n// msg = {\\n// payload: oldMessage.payload,\\n// oldMessage: oldMessage\\n// };\\n\\nvar ES_AUTH = global.get('\''ES_AUTH'\'');\\nvar ES_HOST = global.get('\''ES_HOST'\'');\\nvar ES_PORT = global.get('\''ES_PORT'\'');\\n\\nmsg.method = '\''PUT'\''\\nmsg.url = '\''http://'\'' + ES_AUTH + '\''@'\'' + ES_HOST + '\'':'\'' + ES_PORT + '\''/user-info/default/'\'' + msg.payload['\''{{uniqueField}}'\'']\\n\\nreturn msg;\",\"outputs\":1,\"noerr\":0,\"x\":170,\"y\":200,\"wires\":[[\"{{channelName}}-19-{{_id}}\"]]},{\"id\":\"{{channelName}}-19-{{_id}}\",\"type\":\"http request\",\"name\":\"\",\"method\":\"use\",\"ret\":\"obj\",\"url\":\"\",\"tls\":\"\",\"x\":350,\"y\":200,\"wires\":[[\"{{channelName}}-20-{{_id}}\"]]},{\"id\":\"{{channelName}}-20-{{_id}}\",\"type\":\"join\",\"name\":\"\",\"mode\":\"auto\",\"build\":\"string\",\"property\":\"payload\",\"propertyType\":\"msg\",\"key\":\"topic\",\"joiner\":\"\\\\n\",\"timeout\":\"\",\"count\":\"\",\"x\":510,\"y\":200,\"wires\":[[\"{{channelName}}-21-{{_id}}\"]]},{\"id\":\"{{channelName}}-5-{{_id}}\",\"type\":\"split\",\"name\":\"\",\"splt\":\"\\\\n\",\"x\":850,\"y\":107,\"wires\":[[\"{{channelName}}-15-{{_id}}\"]]},{\"id\":\"{{channelName}}-21-{{_id}}\",\"type\":\"function\",\"name\":\"\",\"func\":\"delete(msg.method);\\ndelete(msg.url);\\ndelete(msg.responseUrl);\\ndelete(msg.headers);\\n\\nfor (var i=0; i< msg.payload.length; i++) {\\n if (msg.payload[i].error) {\\n msg.statusCode = 400\\n }\\n}\\n\\nreturn msg;\",\"outputs\":1,\"noerr\":0,\"x\":698,\"y\":199,\"wires\":[[\"{{channelName}}-13-{{_id}}\"]]},{\"id\":\"{{channelName}}-13-{{_id}}\",\"type\":\"link out\",\"name\":\"\",\"links\":[\"{{channelName}}-10-{{_id}}\"],\"x\":819,\"y\":198,\"wires\":[]}]"\n}'
Next try to create a flow from that template, be sure to copy the right _id from the above command.
curl --request POST \
--url http://localhost:4000/flow \
--header 'content-type: application/json' \
--data '{\n "templateId": "AVxhu3uoK8OHYm5LmJ0s",\n "name": "User Info Channel",\n "description": "The Global Master Phone List joined with some other useful information and uploaded occasionally.",\n "parameters": [\n {\n "key": "channelName",\n "value": "userinfo"\n },\n {\n "key": "channelEndpoint",\n "value": "userInfo"\n },\n {\n "key": "uniqueField",\n "value": "userId"\n }\n ]\n}'
The flow will be created in ES, but the API will return an error message and the flow will not exist in node-red.
So all of the flows/flowtemplates will likely have HTTP endpoints associated with them. For posting data/configuration/etc.
I would like for the requests to be proxied by Hapi to node-red. Such that I don't have to POST/GET to two different places.
So let's say we create the Sample Executions channel. It has an endpoint for posting data. I would like to POST data to localhost:4000/channel/sample-executions/
as opposed to localhost:1880/sample-executions
. We control the endpoints to a certain extent by having the templates be what we need them to be. Most improtant thing is to find a way to proxy the ones the API doesn't pickup.
I imagine Hapi could do this or both could be put behind nginx... dunno which is better.
The current implementation of ingest requires that a standalone node-red be run in addition to ingest. The project would feel lighter-weight if node-red were directly integrated into ingest via the node-red library: https://nodered.org/docs/embedding
Right now there is a missing parameter I can see that in the console output, but the returned error just states Invalid Flow Template Id or Parameter array supplied
. If we know that a parameter is missing we should return that to the user.
From @wrathagom:
we also need to create an issue/discuss this. The idea was to give an interface as the final step of channel creation where data could be piped in temporary. From that we could glean field mappings and provide a way to override the defaults. We could also create pretty field names and descriptions. Some stuff like uniqueness may not matter to ES, but having the extra field meta-data will help the natural language understanding piece later.
I can successfully create one flow from a flow template, but as soon as I create the second flow I get the below error.
ingest_1 | null
ingest_1 | Error: [object Object]
ingest_1 | at callback (/usr/src/app/modules/flow/controllers/add.flow.controller.js:23:29)
ingest_1 | at Async.waterfall (/usr/src/app/models/flow.model.js:92:24)
ingest_1 | at /usr/src/app/node_modules/async/dist/async.js:421:16
ingest_1 | at next (/usr/src/app/node_modules/async/dist/async.js:5279:29)
ingest_1 | at /usr/src/app/node_modules/async/dist/async.js:906:16
ingest_1 | at NodeRED.flow.save (/usr/src/app/models/flow.model.js:85:21)
ingest_1 | at wreck.post (/usr/src/app/datasources/node-red.ds.js:20:28)
ingest_1 | at read (/usr/src/app/node_modules/wreck/lib/index.js:523:20)
ingest_1 | at finish (/usr/src/app/node_modules/wreck/lib/index.js:369:16)
ingest_1 | at wrapped (/usr/src/app/node_modules/hoek/lib/index.js:871:20)
The ES record gets created, but nothing shows up in Node Red.
In my opinion having to run specific commands before running a test and having to put things in the .env file for the test seems too difficult. I am thinking that test should be written to self build
of a sorts.
For example if you POST a flowtemplate as your first test then a GET would be able to succeed.
If that's not acceptable Hapi Lab allows executing commands before a test is actually ran. One or either of these functionality should be used to make the test simpler to run.
lab.experiment('math', () => {
lab.before((done) => {
// Wait 1 second
setTimeout(() => {
done();
}, 1000);
});
lab.beforeEach((done) => {
// Run before every single test
done();
});
lab.test('returns true when 1 + 1 equals 2', (done) => {
Code.expect(1 + 1).to.equal(2);
done();
});
});
As Abe designs the Conveyor Plug-in UI a new piece of functionality has come up for discussion.
Conveyor is all about making it easier to get data into Kibana and to that narrative we want to drop the user onto a page that has two massive buttons: Upload File
and New Conveyor
The new conveyor will send them through the traditional flow of selecting the type of conveyor they want to create and filling out the parameters.
The Upload File
option is all about ease and speed. So we need to discuss the best way to accomplish that.
The big question is: What do we want to happen when the user uploads a file?
More discussion to come.
This should show up as a toggle at the top of the Create screen where the user can toggle between basic and advanced. When on basic all of the groups labeled as advanced are hidden, but still active in that there values will still be used to create the flow. Toggling from advanced to basic should reset the advanced options.
I am thinking that in the creation of templates, it will be easiest to work with them in the JSON format. That means that stringifying them before POSTing is an unnecessary pain. It also prevents top level validation.
So please modify the channelTempalte endpoint to accept a JSON flow and stringify on the server side. Also validate the top level, at least that it has a label and nodes array.
Just like we proxy /data, I want to proxy /configs, which will be used to manage flow configurations. Some flows will have a lot of these, some probably will have none. But I want the existence of this flow configuration to exist on all flows.
Just feel like it's worth saying so that no one mis-reads the :name
in the above image is the name of the configuration parameter not of the flow. the name of the flow would replace TEST_URL
When there is an error one of the metrics on the list will be undefined
. We should capture the metrics even when an error ocurs
Debug: internal, implementation, error
TypeError: Cannot read property 'export' of undefined
at _.forEach (/Users/malave/code/spg/api/alpha-ingest-api/api/plugins/metrics-logger.plugin.js:55:31)
at arrayEach (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/lodash/lodash.js:537:11)
at Function.forEach (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/lodash/lodash.js:9359:14)
at tailHandler (/Users/malave/code/spg/api/alpha-ingest-api/api/plugins/metrics-logger.plugin.js:53:11)
at invoke (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/podium/lib/index.js:239:30)
at each (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/podium/lib/index.js:243:13)
at Object.exports.parallel (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/items/lib/index.js:70:13)
at Object.internals.emit (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/podium/lib/index.js:260:18)
at module.exports.internals.Plugin.internals.Podium._emit (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/podium/lib/index.js:140:15)
at each (/Users/malave/code/spg/api/alpha-ingest-api/api/node_modules/podium/lib/index.js:181:47)
When there is an error with a Node-RED flow (channel) how do we pass that to the interface?
Expose flows.json as an external volume
As Luis was defining the flow parameters and parameter groups it occurred to me that static lists of options wasn't the most helpful. The most immediate solution I have is creating the flow incrementally.
Essentially each parameter group would have an index and they would have to be completed in order. it would also, optionally, have a flow all to itself. This miniature flow would get created once that parameter group was completed. This would allow step 1) to authenticate with a database and create an API endpoint for the UI to call in Step 2) to populate a field.
This is a necessary modification to enhance usability, but isn't necessary right away.
Let's create an index in es where we save off every request coming into the ingest API. Including the proxied requests. and detailed time information, when the request came in, how long it took, and it's response code.
Sounds similar to NGINX logs, with the exception that we track the total_response_time and the proxy_response_time.
End goal would be the ability to GET /request?sort=-total_response_time
to see which requests are taking the longest and identify slow flows.
So I believe that there are going to be certain variables that we may want to share at the global context level in Node-Red. A few examples of these would be the elasticsearch URL, port, and authentication string similar to environment variables, but something that lives in ES and ideally is configurable.
if we go this route we'll also need a way to bring up node-red with this flow pre-existing and we may need some management of what is required/etc in the flow template configuration....
Some flows (one based on SQL for example) wont have their dependencies installed by default in Node-red. We should provide a way of specifying dependencies when a flow template is created and then checking if they are installed and installing if necessary.
Post findings here before coding anything.
Getting this error
{
"statusCode": 400,
"error": "Bad Request",
"message": "child \"template\" fails because [\"template\" is required]",
"validation": {
"source": "payload",
"keys": [
"template"
]
}
}
I am just going with the new name of connector
for the time being. I think it's a bit cheeky.
Luis, we briefly mentioned this last week, but on October 1st in our timeline we said we would "release" ingest. As part of that release we've decided that we want to build a Kibana UI for it. We think of ingest as a tool to get data into ES, but ES from a data perspective is just Elasticsearch and we think the community would be very interested in a tool for getting data (from almost any source) into elastic search to take advantage of it's visualization tools, ML tools, and alerting/reporting capabilities.
So for this week, I'd like you to spend it creating a feature list
for a product that meets the above description.
Just adding a few items here, will add/remove later when I give it some more thought:
api/models/flow.model.js
and api/models/flow-template.model.js
to have a similar structurepackage.json
description and tags for better SEO (or add to to do list)url
or what we decide to use in #32We should be able to enable/disable sub-groups of parameters. This will also affect validations since this parameters even if they are required when enabled, will be optional when disabled
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.