Coder Social home page Coder Social logo

invenio-indexer's Introduction

invenio-indexer's People

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

invenio-indexer's Issues

Parent-Child support

Add the possibility to specify the parent id and parent document type during indexing.

Make CLI record_cls compatible

At the moment the index reindex and index run commands don't respect the record_cls to index the records. For record classes with separate tables indexing is no possible in this way.

The best would be to get the record class back from the uuid. Get the pid_type from the PersitendIdentifier PersistentIdentifier.query.filter_by(object_uuid=uuid).one().pid_type and find with this information the record class in config.py. Or we can transport this information in the payload, because bulk_index_task uses the RecordIndexer to index the uuid's without having the correct definition of record_cls.
We have to use the correct record class in the api.py file in line 280 and 298.

Make CLI index init idempotent

This issue stems from https://github.com/inveniosoftware/invenio-cli potentially re-running invenio index init multiple times under the covers and not wanting to throw an exception when the index is already created.

Like Ansible, invenio-cli's goal (and by extension this cli commands' goal) is to get to a state rather than perform a certain action.

cli: reindex does not take deleted pid into account

The reindex command assumes that all records in the database can be indexed. This assumption works only for simple usage, and doesn't cover e.g. cases where you have a deleted persistent identifier with non-deleted record (e.g. to store a removal reason).

Also, reindex command does not remove records from the index which no longer exists in the database, hence it possibly won't bring your index into sync with your database, which is usually what you would need the command to do.

installation: celery vs flask-celeryext?

Shall we rely on Celery version defined in Flask-CeleryExt? Currently, it causes problems in many packages that depend on Invenio-Indexer and do not specify Flask-CeleryExt as direct dependency.

Control indexer revision_id to avoid ES failure

When indexing the revision_id of the record is sent directly:

return self.client.index(
        id=str(record.id),
        version=record.revision_id,
        version_type=self._version_type,
        index=index,
        doc_type=doc_type,
        body=self._prepare_record(record, index, doc_type),
)

It is true that Elasticsearch will crosscheck that, but in case an update arrives out of order or it is somehow tampered (e.g. signals) it will throw an exception.

Would it be good to add a check and avoid the indexing error. Maybe even potentially rollingback the DB commit in invenio-records-rest to avoid an inconsistent state?

This last question might apply not only to this but in general, to check the indexing result and rollback if needed (although this should be on invenio-records-rest).

RFC api: zero down-time reindexing

Problem

Without changing the JSONSchema you may improve the Elasticsearch mapping especially if you say add computed fields. This will require you to create a new index with the new mapping and reindex all records in the new index.

Right now, you would have to destroy the existing index, recreate it and reindex. This will mean that you search is down (and possibly other parts of Invenio) until the reindexing has finished would could potentially be quite some time causing service disruption.

Proposal

Add features to Invenio-Indexer/Search that allows to create a new index together with the current index. Then take a timestamp T1 and query for all records in a given PID namespace that was modified before this timestamp and send them for reindexing. Once they are all reindexed, you take a new timestamp T2 and reindex any record that was modified in the T1-T2 period. You can redo this until there's no changes (or do a maximum number of iterations). Afterwards you change the alias from old index to the new, and your search should now use the new index and mapping without any downtime.

Issues, index names are currently enforced by invenio-search and you cannot rename an index (AFAIK), i.e. you need something like this (which I don't think invenio-search is capable of today).

  • records-record-v1.0.0
  • records-record-v1.0.0-2

invenio-search dependency

I got this error:

    from invenio_search.utils import build_alias_name
ImportError: cannot import name 'build_alias_name'
meleze ~/devel/rero/rero-ils on maj-cataloging-#928(v

The problem is since the version 1.1.0 it use build_alias_name which is available only in invenio-search>=1.2.0. Some new contraints should be added in the setup.py.

indexer cls: model should not be a cls attr

The indexer has several instance attributes but the model is still a class attribute. This means that different indexers will always have the same model.

In RDM there is the need to have different ones with different model class. In order to keep it backcompat we could simply add an instance attribute named the same (Python would default to cls if the other is not given).

Using of invenio-records model

Bulkindex or indexing by id for invenio-records is not possible for records using own record model. The invenio-indexer should use the model_cls from invenio-records.

More robust bulk indexing

The function _bulk_op handels probems with RabbitMQ not very well. For example if we have memory restrictions for RabbitMQ and we get very low on memory this function will fail without log messages or waiting for the RabbitMQ to response again.

See following error message from RabbitMQ:

memory resource limit alarm set on node 'rabbit@mef'.
**********************************************************
*** Publishers will be blocked until this alarm clears ***
**********************************************************

App crashes every time Elasticsearch returns an error

Everytime Elasticsearch returns an error (e.g. there is some non-escaped forward slash in the query which is not accepted in out of the box configuration) the return value is not a JSON but a Java Traceback.

In troubleshooting there is a note for these cases: inveniosoftware/troubleshooting#24

Would it be better to treat non-escaped values in the application before sending to Elasticsearch? Or Capture the ValueError exception and just log it and return error or empty json?

From my point of view the second option is more flexible to allow custom ES configurations.

ES Reserved Characters here

PS: This also applies when searching, therefore I am not sure if it applies more to Invenio-Search or even Invenio-Records-Rest

tests: simplify doctest execution

The following cookiecutter change:

inveniosoftware/cookiecutter-invenio-module#98

should be propagated to this Invenio module.

Namely, in run-tests.sh, the sphinx for doctests is invoked after pytest run:

$ tail -3 ./\{\{\ cookiecutter.project_shortname\ \}\}/run-tests.sh
sphinx-build -qnNW docs docs/_build/html && python setup.py test && sphinx-build -qnNW -b doctest docs docs/_build/doctest

This sometimes led to problems on Travis CI with the second sphinx-build run due
to "disappearing" dependencies after the example application was tested.

A solution that worked for invenio-marc21 (see
inveniosoftware/invenio-marc21#49 (comment))
and that was integrated in cookiecutter (see
inveniosoftware/cookiecutter-invenio-module#98) was to
run doctest execution in pytest, removing the second sphinx-build invocation.

This both solved Travis CI build failures and simplified test suite execution.

Note that this change may necessitate to amend the code tests etc so that things
would be executed with the Flask application context (see
inveniosoftware/invenio-marc21@09e98fc).

ES: Deprecated field

With ES 6 we get following deprecated warnings:

[WARN ][o.e.d.c.ParseField       ] Deprecated field [_version] used, expected [version] instead
[WARN ][o.e.d.c.ParseField       ] Deprecated field [_version_type] used, expected [version_type] instead

api: impossibility to use multiple custom elasticsearch indexes

Problem:
The only way to index records in multiple custom indexes (not depending on the JSON-Schema URI) is to create a custom RecordIndexer and give it a custom record_to_index function.

The CLI and tasks recreate the RecordIndexer instances at each call. Thus they will always use the default record_to_index. Thus it is not possible to create custom RecordIndexer as the CLI would behave differently.

Possible Solutions:
1/ Have only one RecordIndexer.

  • Add a config variable which can point to an alternative RecordIndexer class. The CLI should use it.
  • Make the indexing methods class methods.
  • Remove customization arguments from the constructor and add them as class attributes.

Effect on other projects:

2/ Allow multiple RecordIndexer.

  • Register a dict of RecordIndexer on the InvenioIndexer extension.
  • Add a parameter to the CLI which enables to choose which RecordIndexer should be used.

Problem: How do we know which record should be indexed with which RecordIndexer?

3/ Remove the CLI

Problem: the database and the elasticsearch index will eventually stop being consistent (one server crash is enough). Thus being able to reindex everything is mandatory.

api: support skipping record when bulk indexing

It should be possible to skip/abort indexing for a record during the RecordIndexer._prepare_record stage. There are two options for that, both involving behavior in the receivers of the before_record_index signal:

  • In the receiver, raise a standardized exception (e.g. raise SkipIndexing()) which will be handled of course appropriately in the _prepare_record method. This solution is backward compatible but uses exceptions (which are slow-ish)
  • In the receiver, return None, or an empty dictionary ({}). The result of the ._prepare_record should be checked in the .index and ._index_action methods if the resulting data is empty. The problem with this solution is that current implementations of receivers are probably not aware that the passed json dictionary parameter might end up being empty...

The use case behind this is that it might be e.g. that a record gets sent for bulk indexing, but is later (soft)deleted while it's still in the bulk indexing queue. When the bulk indexer consumer runs, it should skip this record.

api: reindex doesn't work if the queue is not initialized

Problem:
If, after install invenio3, I run:

invenio3 index reindex --yes-i-know
invenio3 index run

It doesn't index the records because the queue is not previously declared.
What we need to do is declare the queue the first time we initialize the system.

Solution proposed:
We can add a signal inside the invenio-search cli init and destroy; in the same time, in invenio-indexer register a signal to initialize and destroy the queue every time the indices are created or destroyed.


cc @lnielsen @jirikuncar

Bulk index CLI/task: support different record types/class

The RecordIndexer class accepts as optional param record_cls but the CLI and tasks do not and use the default Record class.
Such methods cannot be used with different record classes.

One possible solution would be to define a mapping rec type: rec class and pass the record type as extra param in CLI and tasks args.
See attempt to fix it in this wip PR.

api: stabilise and document

  • check existing API functionality
    • api: check signature of Publisher.publish or add data validation #4
    • api: impossibility to use multiple custom elasticsearch indexes #30
  • add missing important API functionality
  • check API function signatures and parameters
  • enhance API docstrings (param, returns, raises, versionadded)
  • plug API functions to existing docs

api: support bulk updates

Currently, the API supports bulk index and delete. It would be nice to have a bulk_update, which would allow partial updates to indexed documents. The API calls could look something like this:

class RecordIndexer(object):
...
    def bulk_update(self, record_patch_iterator):
        # record_patch_iterator is a iterable of 2-tuples with a record UUID 
        # and a dictionary with the patch/update (see example below)
        self._bulk_op(record_patch_iterator, 'update')

    def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
        with self.create_producer() as producer:
            if op_type == 'update':
                for rec, patch in record_id_iterator:
                producer.publish(dict(
                    id=str(rec),
                    patch=patch,
                    op='update',
                    index=index,
                    doc_type=doc_type
                ))
            else:
                ...
    
    def _update_action(self, payload):
        ...
        return {
            '_op_type': 'delete',
            '_index': index,
            '_type': doc_type,
            '_id': payload['id'],
            'doc': payload['patch']
        }

RecordIndexer().bulk_update(
    ['<some-record-uuid>', {'_stats': {'views': 42, 'downloads': 420}}],
    ['<some-record-uuid>', {'_stats': {'views': 100, 'downloads': 200}}],
    ...
)

Implement full-text indexing for attached documents

Full-text indexing with the ES Mapper Attachments Plugin

I propose to use this plugin to implement the indexing of attached documents:
Mapper Attachments Plugin

It is a fast and easy solution, maintened by the Elasticsearch team.
As described in the link above, it uses the Tika library to analyze the content of a file.

A first example

The Mapper attachment plugin for elasticsearch provides you a new field type named attachment. In order to use it you first need to setup your mapping. Here for instance i create a tind-doc index, with a tind_document type, with 3 properties [filename, insert_date, file]. The file properties has the type attachment with his configuration:

PUT tind-doc/
{
    "mappings": {
      "tind_document": {
        "_source":{
          "excludes":["file"]
        },
        "properties": {
          "file": {
            "type": "attachment",
            "fields": {
              "content": {
                "type": "string",
                "term_vector": "with_positions_offsets",
                "store":"yes"
              },
              "author": {
                "type": "string"
              },
              "title": {
                "type": "string"
              },
              "name": {
                "type": "string"
              },
              "date": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis"
              },
              "keywords": {
                "type": "string"
              },
              "content_type": {
                "type": "string"
              },
              "content_length": {
                "type": "integer"
              },
              "language": {
                "type": "string",
                "store":"yes"
              }
            }
          },
          "filename": {
            "type": "string"
          },
          "insert_date": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          }
        }
      }
    }
}

Once you defined the "type":"attachment" there are several field provide automatically by the Tika parser that could be set up with a proper mapping customization. (by default the language recognition is disabled, look at the link to enable it).
Anyway i found that most of this field , in our case, are not properly recognited, and we can get rid of them.

The most important is the content field that is where our file will be inserted, the "term_vector" option is required for the highlights feature that will be showed later.

Index a document

For indexing a document ES require a base64 encoded string. Below i show a working script for encode the document and put it into ES.

class TindDocument(DocType):
    filename = String()
    insert_date = Date()
    file = Attachment(fields={
        'content': String(term_vector="with_positions_offsets", store=True),
    })

    class Meta:
        index = 'tind-doc'

connections.create_connection(hosts=['localhost'])

for input_file in doc_files:
    with open(input_file, "rb") as ifd:
        b = base64.b64encode(ifd.read())

    attachment = dict(
        _content=b,
        _indexed_chars=-1,
        _detect_language=True,
    )

    firstdoc = TindDocument(
        filename=input_file, insert_date=datetime.now(), file=attachment)
    firstdoc.save()

There are some options that you can provide.

  • The _detect_language if you want to say ES to try to detect the language of the document ( i found that it fails for arabic and chinese) .
  • You can use the _indexed_chars to limit the index to a fixed amount of char ( -1 for the full document, 100k default )

Search

Here are some search examples:

Note the highlights feature in the results, and the bad recognition of the chinese language.

eng-long

stupid-search

chinese-short

Aggregations:

aggs

Some performance measures

More tha 300 documents indexed in less than 20 seconds (from 20k to 9M per document):

300-docs

Some measure made with ES Marvel. The first is about the index.

index-graph-600-docs

The second about the node (the number of documents comprehend even the kibana and marvel data)

node-graphs

Next step:

Figure out how to integrate this feature in invenio

tests: conflict between CliRunner and Celery in memory broker

When I run the tests I get the exception:

tests/test_utils.py:105:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
invenio_indexer/api.py:195: in process_bulk_queue
    request_timeout=req_timeout,
../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:190: in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:162: in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

client = <LocalProxy unbound>, bulk_actions = ['{"delete": {"_index": "records-default-v1.0.0", "_type": "default-v1.0.0", "_id": "7133d6e5-a332-455d-b0c8-973ca420f...ete": {"_index": "records-default-v1.0.0", "_type": "default-v1.0.0", "_id": "7133d6e5-a332-455d-b0c8-973ca420f686"}}'], raise_on_exception = True, raise_on_error = True
kwargs = {'request_timeout': 10}, errors = [{'delete': {'_id': '7133d6e5-a332-455d-b0c8-973ca420f686', '_index': 'records-default-v1.0.0', '_shards': {'failed': 0, 'successful': 1, 'total': 2}, '_type': 'default-v1.0.0', ...}}], resp = {'errors': False, 'items': [{}, {}], 'took': 7}

    def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):

[...TRUNCATED FOR BREVITY...]

        if errors:
>           raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
E           elasticsearch.helpers.BulkIndexError: ('1 document(s) failed to index.', [{'delete': {'found': False, '_version': 2, '_shards': {'successful': 1, 'total': 2, 'failed': 0}, '_id': '7133d6e5-a332-455d-b0c8-973ca420f686', 'status': 404, '_type': 'default-v1.0.0', '_index': 'records-default-v1.0.0'}}])

../../lib/python3.5/site-packages/elasticsearch/helpers/__init__.py:134: BulkIndexError

After some more digging it looks like two delete messages are queued when one is published here

This only happens when tests/test_cli.py::test_reindex is ran before tests/test_utils.py::test_record_indexing like this:

py.test tests/test_cli.py::test_reindex tests/test_utils.py::test_record_indexing

If tests/test_utils.py::test_record_indexing alone is ran then the issue does not appear.

I tracked down the issue to the BROCKER_URL config variable. The problem arises only with BROKER_URL=os.environ.get('BROKER_URL', 'memory://') but not with rabbitmq or redis.

api: bulk indexing two or more times a record

Problem

Sometimes happen that we add a record inside the bulk queue two o more times in a short period (because e.g. different celery tasks are working with it).
We were wondering if we can/need to optimize the bulk operation from DB side and from ES side.

Questions

  1. Does elasticsearch-py handle automatically this situation?

  2. Could we optimize in a way that a record is read from the database no more than one times?

  3. Could we maybe use the Record.get_records() to improve the read instead of a lot of Record.get_record()?

My first attempt to not indexing a record two times.
For sure we can find a better solution, but it can give you an idea to what I mean.


cc @egabancho

api: allow specifying parameters for bulk indexing

The elasticsearch.helpers.bulk function takes various parameters that control things such as how to handle errors/exceptions when bulk indexing (e.g. for continuing when encountering version conflict errors). Currently, there is no way of passing these to invenio_indexer.tasks.process_bulk_queue or
RecordIndexer.process_bulk_queue.

Allowing just **kwargs would probably be enough, though exposing some of these to the CLI might be useful as well.

Redis Pin cause App installation to fail

#94

Some applications fail to be installed since other packages (e.g. invenio-accounts) will install first Redis 3.x.

Since this pin should be removed soon (when Kombu releases #95). Should it be pined somewhere else or just advice the users (put a note somewhere??) to pin it in their apps?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.