Record indexer for Invenio.
Further documentation is available on https://invenio-indexer.readthedocs.io/
Record indexer for Invenio.
Home Page: https://invenio-indexer.readthedocs.io
License: MIT License
Record indexer for Invenio.
Further documentation is available on https://invenio-indexer.readthedocs.io/
Add the possibility to specify the parent id and parent document type during indexing.
At the moment the index reindex
and index run
commands don't respect the record_cls
to index the records. For record classes with separate tables indexing is no possible in this way.
The best would be to get the record class back from the uuid. Get the pid_type
from the PersitendIdentifier PersistentIdentifier.query.filter_by(object_uuid=uuid).one().pid_type
and find with this information the record class in config.py. Or we can transport this information in the payload, because bulk_index_task
uses the RecordIndexer
to index the uuid's without having the correct definition of record_cls
.
We have to use the correct record class in the api.py file in line 280 and 298.
This issue stems from https://github.com/inveniosoftware/invenio-cli potentially re-running invenio index init multiple times under the covers and not wanting to throw an exception when the index is already created.
Like Ansible, invenio-cli's goal (and by extension this cli commands' goal) is to get to a state rather than perform a certain action.
Add elasticsearch6
in setup.py
's extras (pointing to invenio-search[elasticsearch6]
)
Create utility class which does bulk indexing by default.
The reindex command assumes that all records in the database can be indexed. This assumption works only for simple usage, and doesn't cover e.g. cases where you have a deleted persistent identifier with non-deleted record (e.g. to store a removal reason).
Also, reindex command does not remove records from the index which no longer exists in the database, hence it possibly won't bring your index into sync with your database, which is usually what you would need the command to do.
Shall we rely on Celery version defined in Flask-CeleryExt? Currently, it causes problems in many packages that depend on Invenio-Indexer and do not specify Flask-CeleryExt as direct dependency.
When indexing the revision_id of the record is sent directly:
return self.client.index(
id=str(record.id),
version=record.revision_id,
version_type=self._version_type,
index=index,
doc_type=doc_type,
body=self._prepare_record(record, index, doc_type),
)
It is true that Elasticsearch will crosscheck that, but in case an update arrives out of order or it is somehow tampered (e.g. signals) it will throw an exception.
Would it be good to add a check and avoid the indexing error. Maybe even potentially rollingback the DB commit in invenio-records-rest
to avoid an inconsistent state?
This last question might apply not only to this but in general, to check the indexing result and rollback if needed (although this should be on invenio-records-rest
).
Problem
Without changing the JSONSchema you may improve the Elasticsearch mapping especially if you say add computed fields. This will require you to create a new index with the new mapping and reindex all records in the new index.
Right now, you would have to destroy the existing index, recreate it and reindex. This will mean that you search is down (and possibly other parts of Invenio) until the reindexing has finished would could potentially be quite some time causing service disruption.
Proposal
Add features to Invenio-Indexer/Search that allows to create a new index together with the current index. Then take a timestamp T1 and query for all records in a given PID namespace that was modified before this timestamp and send them for reindexing. Once they are all reindexed, you take a new timestamp T2 and reindex any record that was modified in the T1-T2 period. You can redo this until there's no changes (or do a maximum number of iterations). Afterwards you change the alias from old index to the new, and your search should now use the new index and mapping without any downtime.
Issues, index names are currently enforced by invenio-search and you cannot rename an index (AFAIK), i.e. you need something like this (which I don't think invenio-search is capable of today).
I got this error:
from invenio_search.utils import build_alias_name
ImportError: cannot import name 'build_alias_name'
meleze ~/devel/rero/rero-ils on maj-cataloging-#928(v
The problem is since the version 1.1.0 it use build_alias_name
which is available only in invenio-search>=1.2.0
. Some new contraints should be added in the setup.py
.
The indexer has several instance attributes but the model is still a class attribute. This means that different indexers will always have the same model.
In RDM there is the need to have different ones with different model class. In order to keep it backcompat we could simply add an instance attribute named the same (Python would default to cls if the other is not given).
Bulkindex or indexing by id for invenio-records is not possible for records using own record model. The invenio-indexer should use the model_cls from invenio-records.
The function _bulk_op
handels probems with RabbitMQ not very well. For example if we have memory restrictions for RabbitMQ and we get very low on memory this function will fail without log messages or waiting for the RabbitMQ to response again.
See following error message from RabbitMQ:
memory resource limit alarm set on node 'rabbit@mef'.
**********************************************************
*** Publishers will be blocked until this alarm clears ***
**********************************************************
Possible braking changes
Everytime Elasticsearch returns an error (e.g. there is some non-escaped forward slash in the query which is not accepted in out of the box configuration) the return value is not a JSON but a Java Traceback.
In troubleshooting there is a note for these cases: inveniosoftware/troubleshooting#24
Would it be better to treat non-escaped values in the application before sending to Elasticsearch? Or Capture the ValueError
exception and just log it and return error or empty json?
From my point of view the second option is more flexible to allow custom ES configurations.
ES Reserved Characters here
PS: This also applies when searching, therefore I am not sure if it applies more to Invenio-Search or even Invenio-Records-Rest
The following cookiecutter change:
inveniosoftware/cookiecutter-invenio-module#98
should be propagated to this Invenio module.
Namely, in run-tests.sh
, the sphinx for doctests is invoked after pytest run:
$ tail -3 ./\{\{\ cookiecutter.project_shortname\ \}\}/run-tests.sh
sphinx-build -qnNW docs docs/_build/html && python setup.py test && sphinx-build -qnNW -b doctest docs docs/_build/doctest
This sometimes led to problems on Travis CI with the second sphinx-build run due
to "disappearing" dependencies after the example application was tested.
A solution that worked for invenio-marc21 (see
inveniosoftware/invenio-marc21#49 (comment))
and that was integrated in cookiecutter (see
inveniosoftware/cookiecutter-invenio-module#98) was to
run doctest execution in pytest, removing the second sphinx-build
invocation.
This both solved Travis CI build failures and simplified test suite execution.
Note that this change may necessitate to amend the code tests etc so that things
would be executed with the Flask application context (see
inveniosoftware/invenio-marc21@09e98fc).
With ES 6 we get following deprecated warnings:
[WARN ][o.e.d.c.ParseField ] Deprecated field [_version] used, expected [version] instead
[WARN ][o.e.d.c.ParseField ] Deprecated field [_version_type] used, expected [version_type] instead
Problem:
The only way to index records in multiple custom indexes (not depending on the JSON-Schema URI) is to create a custom RecordIndexer
and give it a custom record_to_index
function.
The CLI and tasks recreate the RecordIndexer
instances at each call. Thus they will always use the default record_to_index
. Thus it is not possible to create custom RecordIndexer
as the CLI would behave differently.
Possible Solutions:
1/ Have only one RecordIndexer
.
RecordIndexer
class. The CLI should use it.Effect on other projects:
RecordIndexer
2/ Allow multiple RecordIndexer
.
RecordIndexer
on the InvenioIndexer extension.RecordIndexer
should be used.Problem: How do we know which record should be indexed with which RecordIndexer
?
3/ Remove the CLI
Problem: the database and the elasticsearch index will eventually stop being consistent (one server crash is enough). Thus being able to reindex everything is mandatory.
Add missing API docs as stated in #57.
It should be possible to skip/abort indexing for a record during the RecordIndexer._prepare_record
stage. There are two options for that, both involving behavior in the receivers of the before_record_index
signal:
raise SkipIndexing()
) which will be handled of course appropriately in the _prepare_record
method. This solution is backward compatible but uses exceptions (which are slow-ish)None
, or an empty dictionary ({}
). The result of the ._prepare_record
should be checked in the .index
and ._index_action
methods if the resulting data is empty. The problem with this solution is that current implementations of receivers are probably not aware that the passed json
dictionary parameter might end up being empty...The use case behind this is that it might be e.g. that a record gets sent for bulk indexing, but is later (soft)deleted while it's still in the bulk indexing queue. When the bulk indexer consumer runs, it should skip this record.
Investigate failing deployment.
Problem:
If, after install invenio3, I run:
invenio3 index reindex --yes-i-know
invenio3 index run
It doesn't index the records because the queue is not previously declared.
What we need to do is declare the queue the first time we initialize the system.
Solution proposed:
We can add a signal inside the invenio-search
cli init
and destroy
; in the same time, in invenio-indexer
register a signal to initialize
and destroy
the queue every time the indices are created or destroyed.
Force the signature to publish(id, op, index, doctype)
to make the API more rigid or add data validation.
The package contains examples/app.py
example application, but there is no test
for it in tests/test_example_app.py
. It should be added. See existing examples:
The RecordIndexer
class accepts as optional param record_cls
but the CLI and tasks
do not and use the default Record
class.
Such methods cannot be used with different record classes.
One possible solution would be to define a mapping rec type: rec class
and pass the record type as extra param in CLI and tasks args.
See attempt to fix it in this wip PR.
Make indexer record_cls configurable, similar to record service indexer.
param
, returns
, raises
, versionadded
)experimental
note from the README.rst.Currently, the API supports bulk index
and delete
. It would be nice to have a bulk_update
, which would allow partial updates to indexed documents. The API calls could look something like this:
class RecordIndexer(object):
...
def bulk_update(self, record_patch_iterator):
# record_patch_iterator is a iterable of 2-tuples with a record UUID
# and a dictionary with the patch/update (see example below)
self._bulk_op(record_patch_iterator, 'update')
def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
with self.create_producer() as producer:
if op_type == 'update':
for rec, patch in record_id_iterator:
producer.publish(dict(
id=str(rec),
patch=patch,
op='update',
index=index,
doc_type=doc_type
))
else:
...
def _update_action(self, payload):
...
return {
'_op_type': 'delete',
'_index': index,
'_type': doc_type,
'_id': payload['id'],
'doc': payload['patch']
}
RecordIndexer().bulk_update(
['<some-record-uuid>', {'_stats': {'views': 42, 'downloads': 420}}],
['<some-record-uuid>', {'_stats': {'views': 100, 'downloads': 200}}],
...
)
Undo #94 when is released celery/kombu#953
I propose to use this plugin to implement the indexing of attached documents:
Mapper Attachments Plugin
It is a fast and easy solution, maintened by the Elasticsearch team.
As described in the link above, it uses the Tika library to analyze the content of a file.
The Mapper attachment plugin for elasticsearch provides you a new field type named attachment. In order to use it you first need to setup your mapping. Here for instance i create a tind-doc index, with a tind_document type, with 3 properties [filename, insert_date, file]. The file properties has the type attachment with his configuration:
PUT tind-doc/
{
"mappings": {
"tind_document": {
"_source":{
"excludes":["file"]
},
"properties": {
"file": {
"type": "attachment",
"fields": {
"content": {
"type": "string",
"term_vector": "with_positions_offsets",
"store":"yes"
},
"author": {
"type": "string"
},
"title": {
"type": "string"
},
"name": {
"type": "string"
},
"date": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"keywords": {
"type": "string"
},
"content_type": {
"type": "string"
},
"content_length": {
"type": "integer"
},
"language": {
"type": "string",
"store":"yes"
}
}
},
"filename": {
"type": "string"
},
"insert_date": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
}
}
Once you defined the "type":"attachment"
there are several field provide automatically by the Tika parser that could be set up with a proper mapping customization. (by default the language recognition is disabled, look at the link to enable it).
Anyway i found that most of this field , in our case, are not properly recognited, and we can get rid of them.
The most important is the content field that is where our file will be inserted, the "term_vector"
option is required for the highlights feature that will be showed later.
For indexing a document ES require a base64 encoded string. Below i show a working script for encode the document and put it into ES.
class TindDocument(DocType):
filename = String()
insert_date = Date()
file = Attachment(fields={
'content': String(term_vector="with_positions_offsets", store=True),
})
class Meta:
index = 'tind-doc'
connections.create_connection(hosts=['localhost'])
for input_file in doc_files:
with open(input_file, "rb") as ifd:
b = base64.b64encode(ifd.read())
attachment = dict(
_content=b,
_indexed_chars=-1,
_detect_language=True,
)
firstdoc = TindDocument(
filename=input_file, insert_date=datetime.now(), file=attachment)
firstdoc.save()
There are some options that you can provide.
_detect_language
if you want to say ES to try to detect the language of the document ( i found that it fails for arabic and chinese) ._indexed_chars
to limit the index to a fixed amount of char ( -1 for the full document, 100k default )Here are some search examples:
Note the highlights feature in the results, and the bad recognition of the chinese language.
More tha 300 documents indexed in less than 20 seconds (from 20k to 9M per document):
Some measure made with ES Marvel. The first is about the index.
The second about the node (the number of documents comprehend even the kibana and marvel data)
Figure out how to integrate this feature in invenio
This issue stems from https://github.com/inveniosoftware/invenio-cli potentially re-running invenio index destroy
multiple times under the covers and not wanting to throw an exception when the index is already destroyed.
Like Ansible, invenio-cli's goal (and by extension this cli commands' goal) is to get to a state rather than perform a certain action.
Flask-SQLAlchemy 2.2 changed the way it picks a sender for the model signals so changes need to be done in tests/test_utils.py
so test can pass with this version.
Moreover, this problem was hard to detect because the test case for record_indexer.process_bulk_queue()
(https://github.com/inveniosoftware/invenio-indexer/blob/master/tests/test_utils.py#L88) is wrong, it should check the result content instead of its length.
When I run the tests I get the exception:
tests/test_utils.py:105:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
invenio_indexer/api.py:195: in process_bulk_queue
request_timeout=req_timeout,
../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:190: in bulk
for ok, item in streaming_bulk(client, actions, **kwargs):
../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:162: in streaming_bulk
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <LocalProxy unbound>, bulk_actions = ['{"delete": {"_index": "records-default-v1.0.0", "_type": "default-v1.0.0", "_id": "7133d6e5-a332-455d-b0c8-973ca420f...ete": {"_index": "records-default-v1.0.0", "_type": "default-v1.0.0", "_id": "7133d6e5-a332-455d-b0c8-973ca420f686"}}'], raise_on_exception = True, raise_on_error = True
kwargs = {'request_timeout': 10}, errors = [{'delete': {'_id': '7133d6e5-a332-455d-b0c8-973ca420f686', '_index': 'records-default-v1.0.0', '_shards': {'failed': 0, 'successful': 1, 'total': 2}, '_type': 'default-v1.0.0', ...}}], resp = {'errors': False, 'items': [{}, {}], 'took': 7}
def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
[...TRUNCATED FOR BREVITY...]
if errors:
> raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
E elasticsearch.helpers.BulkIndexError: ('1 document(s) failed to index.', [{'delete': {'found': False, '_version': 2, '_shards': {'successful': 1, 'total': 2, 'failed': 0}, '_id': '7133d6e5-a332-455d-b0c8-973ca420f686', 'status': 404, '_type': 'default-v1.0.0', '_index': 'records-default-v1.0.0'}}])
../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:134: BulkIndexError
After some more digging it looks like two delete messages are queued when one is published here
This only happens when tests/test_cli.py::test_reindex
is ran before tests/test_utils.py::test_record_indexing
like this:
py.test tests/test_cli.py::test_reindex tests/test_utils.py::test_record_indexing
If tests/test_utils.py::test_record_indexing
alone is ran then the issue does not appear.
I tracked down the issue to the BROCKER_URL
config variable. The problem arises only with BROKER_URL=os.environ.get('BROKER_URL', 'memory://')
but not with rabbitmq or redis.
Sometimes happen that we add a record inside the bulk queue two o more times in a short period (because e.g. different celery tasks are working with it).
We were wondering if we can/need to optimize the bulk operation from DB side and from ES side.
Does elasticsearch-py
handle automatically this situation?
Could we optimize in a way that a record is read from the database no more than one times?
Could we maybe use the Record.get_records()
to improve the read instead of a lot of Record.get_record()
?
My first attempt to not indexing a record two times.
For sure we can find a better solution, but it can give you an idea to what I mean.
cc @egabancho
The elasticsearch.helpers.bulk
function takes various parameters that control things such as how to handle errors/exceptions when bulk indexing (e.g. for continuing when encountering version conflict errors). Currently, there is no way of passing these to invenio_indexer.tasks.process_bulk_queue
or
RecordIndexer.process_bulk_queue
.
Allowing just **kwargs
would probably be enough, though exposing some of these to the CLI might be useful as well.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.