Coder Social home page Coder Social logo

dlt-hub / dlt Goto Github PK

View Code? Open in Web Editor NEW
1.7K 18.0 99.0 76.58 MB

data load tool (dlt) is an open source Python library that makes data loading easy ๐Ÿ› ๏ธ

Home Page: https://dlthub.com/docs

License: Apache License 2.0

Makefile 0.11% Python 99.60% Dockerfile 0.05% Shell 0.25%
data python data-engineering data-lake data-loading data-warehouse elt extract load transform

dlt's People

Contributors

aarora-quadfi avatar adrianbr avatar anuunchin avatar astrakhantsevaaa avatar burnash avatar codingcyclist avatar dat-a-man avatar deanja avatar hibajamal avatar ilyafaer avatar jorritsandbrink avatar matthauskrzykowski avatar neilgorman104 avatar phillem15 avatar pipboyguy avatar rahuljo avatar redicane avatar rudolfix avatar sh-rp avatar steinitzu avatar sultaniman avatar tonghere avatar tungbq avatar tydunn avatar vikas-edgevana avatar violetm avatar willi-mueller avatar wtfzambo avatar z3z1ma avatar zem360 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dlt's Issues

[dbt] add "simple materializations" for generated `dbt` packages.

Motivation
Providing that #118 is implemented we want to add simple materializations for the tables defined in schemas of a given pipeline. Those simple materializations will make sure that data is

  • deduplicated
  • merged according to merge keys

you can think about it as an extension of initial models defined in #118 initial dbt package

Requirements

    • for any table that defines primary_key hint we want a materialization that deduplicates the source table
    • for any table that defines merge_key hint we want a materialization that merges the data according to the key
    • keys can be compound
    • tables may have child tables that may also need deduplication and merging
    • use load_id for incremental loading
    • bonus for: taking into account distribution, sort and clustering hints when merging
    • should work with bigquery redshift postgres duckdb

[destination] implement loader for Snowflake with PUT stage

Background
Implement Snowflake destination based on PUT stage. It will be quite similar to the BigQuery implementation (just simpler).

Tasks

  • use jsonl as preferred type
  • use PUT to stage
  • allow option to remove files when COPY INTO succeeded.
  • our jsonl uses base64 to store binary so enable BINARY_FORMAT in COPY INTO https://docs.snowflake.com/en/sql-reference/sql/copy-into-table
  • fail on any error on COPY INTO, return up to 100 errors
  • i think you must use TRUNCATE for replace load, do TRUNCATE COPY in one transaction!
  • implement cluster hint with CLUSTER BY

Authentication
Derived from ConnectionStringCredentials:

    • should expose several query parameters as fields warehouse role

Tests

    • we should enable it as destination in tests. all tests using ALL_DESTINATIONS must pass
    • it should pass the same common tests which BigQuery job client and sql client is passing
    • note that to pass sql_client tests you'll need to map DBApi exceptions into 3 categories that dlt needs: relation not found, terminal exceptions, transient exceptions. see bigquery and postgres implementations. that's manual work

all in all writing the test suite is more work than the code here

Add date type

Motivation
low prio: I think the number of professionals who care about date storage size and care to manage it separately is in decline so consider this a low prio unless we learn otherwise. I think most people would prefer utc timestamps that they can localize in their frontend app.

why date?: Currently we only support timestamp. In most BI databases DATE costs half to store and can be more than 2x cheaper to join on (compressed value join vs calculated value join) native date than calculated date.

Tasks

  1. declare new schema type in dlt/common/schema/typing.py
  2. map python type to schema type in py_type_to_sc_type function (there's TODO already)
  3. map schema type to database type for each destination (ie. dlt/destinations/bigquery/bigquery.py)

optional
4. add type coercions in coerce_value function. from iso string and from timestamp without days part. also add coercion from date to timestamp

Tests

  1. add cases to py_type_to_sc_type tests
  2. add cases to coerce_value tests
  3. look at test_load_with_all_types and TABLE_ROW fixture, add datetime to tested columns

[pipeline] implement state merge logic in case of parallel processing or state restoration from destination

implement proper state management in Pipeline

  1. state as visible by the source is just a dictionary (read and write via indexer)
  2. each source iterator that requires state should operate on it's own namespace (Pipeline.state["namespace"] for explicit names or Pipeline.state for implicit names based ie. on name of calling module)
  3. writing to state may only happen after iterator has stopped. that must be detected and extraction must be aborted
  4. state is preserved atomically together with the results of the extraction
  5. if many iterators operating on the same state were returned, we have a merge strategy: maximum value of all conflicting writes is preserved (ie. https://docs.meltano.com/guide/integration#internal-state-merge-logic)

Implementation

  1. Consider locking state file when merging (ie. https://py-filelock.readthedocs.io/en/latest/index.html)

Unpacking recursion should be configurable

In a json doc, beyond a nesting depth of 2 (from root) there is usually not much useful info, but often still some nested jsons

An example could be that an event (say "payment") has sub entities ("order", "payment attempt", "payment method"), those sub-entities might have sub entities (order has items) and those sub entities have versions or validity.

What we want to do for those cases is unpack to a specified depth (pass as arg) and load the nested data as string so we can have it for record keeping/auditing and possible future use, but not unpack it for tidiness. We could default this to something reasonable too, to avoid "too deep for recursion" type of errors.

[core] customize pipeline with hooks

action to be triggered by certain events in the pipeline can be delegated by the pipeline developer. examples of events and actions:

  • load job permanently failed. user code gets error message, can inspect the affected file and make decision to proceed or abort the whole load
  • new table or new column was inferred. user may change the data type, write disposition or ignore the change (which will drop the data)
  • variant column is being created due to inconsistent types. options as above
  • value cannot be coerced into the column type. user can provide custom casting function (ie. for text->datetime conversions)
  • relational json normalizer recurs into nested table and calls a hook. based on table name, parent table name, nesting level etc. you can decide to create complex type that will serialize the nested elements into json or proceed with recursion or drop the data
  • when writing text value into the loader file we exceed VARCHAR(MAX) length: user can cut the string or raise error (or ignore and try his luck)

[we need more events]

we need a clean way to define and add hook functions. investigate if https://github.com/getslash/gossip fits.

[sources] introduce structure for sources and source development

we need to think about source interface,

  • for example how we obtain iterator? how config is passed
  • source discovery: where in the repository sources reside
  • schemas and schemas discovery: how we discover the schemas for sources that support that
  • source utilities: there are helper methods, mappings, code snippets that are useful when using the source
  • code reusability and modification: how can people modify sources ie. to add or remove some data, change behaviour, hardcode something
  • how dependencies to run source are managed
  • if source returns many tables/streams, how we can select which tables to yield? we could pass list of table names as input parameter + provide a mapping function that will filter unwanted tables (very inefficient)
  • in case of the above: should we return a separate iterator for each stream or we always return an iterator?

organize sources: we had an idea for

  • scalevector supported sources merged into dlt repository
  • contrib sources that are available in separate repository dlt-sources-contrib but reviewed in tested on some rudimentary level. those sources should be maintained by the community
  • snippet sources - sources stored as useful snippet of code to be copy-pasted into someones pipeline ie. for reading and transforming certain file or calling a simple API + simple schema etc. that would be a part of dlt-sources-contrib repo but not distributed in contrib package

implementation:
generation functions are quite complicated python objects. we can use __name__ and __qualname__ to pass our metadata (with decorator over the source function)
They also contain gi_ fields from where we can get information on actual code being executed, local variables, function arguments etc.
collections.Generator does not provide such information
https://docs.python.org/3/library/inspect.html - look for those variables explanation, also helper function to check generator function instances

import code
code.interact(local=locals())
# https://stackoverflow.com/questions/29488327/changing-the-name-of-a-generator
# https://bugs.python.org/issue21205
# https://docs.python.org/3/library/inspect.html
# https://stackoverflow.com/questions/58108488/what-is-qualname-in-python

[core] allow to seal table and schemas

Sealing a table or the whole schema means that:

  1. table / schema definition is immutable
  2. the data that cannot be coerced into exiting tables and columns will be dropped (filtered out)
  3. provide an option to load the "bad data" to separate tables (ie. as JSON blob)

The difficulties

  • to seal the schema, it must be known. we typically let the dlt to infer the schema. are there any requirements to make the sealing easier or we just require to modify the schema at runtime

mimimum requirements from Adrian:

At the very least, we should be able to toggle "contract_mode=On" (let's use something relatable rather than seal?)
In this mode,

  • If there is nothing yet loaded, schema can evolve and be created
  • if there is something already loaded, schema may not evolve
  • this mode can be toggled on/off to allow temporary evolution

What should happen when schema is not allowed to evolve?

  • Any operation that would cause additions to the original schema should fail
  • the data should just not be loaded
  • any operations where the performance hints would change should fail. This includes keys, performance, and nullable hints, basically all changes
  • this does not relate to dlt's normaliser - it is expected that this normaliser types the data and normalises it - this refers to the schema only.

Open questions by @sh-rp :

  • How does this integrate with providing a schema.yaml
  • Should we enable sealing / freezing individual table chains with this PR? If so, how should we do it? Via the resource decorator and if so, does this get saved into the stored schema?
  • If we load "bad data" into an additional destination, should we store the complete data there?
  • Should the trace indicate wether the schema is sealed and should we maybe add schema change output info to the trace? This would be very nice for the user playing with dlt to see what is going on under the hood imho.

create a version of `requests` that retries on certain HTTP statuses by default

Motivation

  1. Most of the pipelines do hundreds of requests, if there's no retry built in the whole pipeline will fail.
  2. We want a blueprint for "source helper" - something that is so often used that is a part of dlt core

Tasks

  1. implement retry session using HTTPAdapter: https://majornetwork.net/2022/04/handling-retries-in-python-requests/
  2. allow to configure timeout as well.
  3. provide a default session with sane timeout and retry parameters
  4. provide configurable session like: https://github.com/bustawin/retry-requests
  5. look at thread safety ie. here someone explains how to make a shared connection pool which is thread safe https://stackoverflow.com/questions/18188044/is-the-session-object-from-pythons-requests-library-thread-safe

Place code in dlt.sources.helpers.requests module and expose the default session as requests and configurable session as requests_with_retry(...)

the point of the above is to provide drop-in replacement for people making requests in pipelines repository, They'd just import from some other module but the remaining code could stay the same.

schema prefix should not be required

from dlt.pipeline.typing import GCPPipelineCredentials
GCPPipelineCredentials.from_services_dict(credentials, dataset_prefix='')

currently: not passing the prefix or passing an empty string results in an error
desired: Not passing a prefix, or passing None results in no prefix being used.

Schema change log access

We need to be able to access schema changes that have happened during a load. We should ideally be very informative, as the volume of schema events is low, so capturing all of them is not overdoing it.

We should capture information such as:

  • Load id or timestamp
  • namespace info: Dataset/schema, table, column name, metadata
  • PIPELINE NAME (multiple pipelines could load to the same staging layer or tables)
  • type of change: New column added, variant column added, (other?)

This information should be accessible from the pipeline after loading.

Documentation should be created or examples.

deterministic behavior for elements that are union of type T and list of type T

Often json document contain inconsistently typed elements that may occur 1 or more times. In case of 1 they are inserted directly as type T, if > 1 they are inserted as list of type T.

ie. first_name may be "Juan" or ["Juan", "Pablo"]

in that case DLT builds bot flattened representation and table representation for T and inserts in both place.

Fix the json normalizer such that if table is present in the schema, the table is always used.
in relational.py

if isinstance(v, (dict, list)):
                if not _is_complex_type(schema, table, child_name, __r_lvl):
                    # TODO: if schema contains table {table}__{child_name} then convert v into single element list
                    if isinstance(v, dict):
                        # flatten the dict more
                        norm_row_dicts(v, __r_lvl + 1, parent_name=child_name)
                    else:
                        # pass the list to out_rec_list
                        out_rec_list[child_name] = v

Note: in case of automatic schema inference, all values until first list of T is detected will be added flattened. To assure consistency such tables should be added to the schema before first data item is normalized

Handle bad records in "data contract" mode

We need to be able to enforce schemas. We should be able to set a pipelne to a "fixed schema" mode or "data contract" mode

In this mode, the schema persisted at destination becomes the "valid" schema. If there is no schema yet, then a schema is created by parsing some data. This would enable a person wanting to track data to generate a schema from an event, and enforce it.

When attempting to load the records into the frozen schema, the pipeline should handle bad records into a "bad records" table, with the info

  • load id or timestamp
  • record as string if possible, if not truncated to waht is possible.
  • stack trace as string. If too long, truncate to what is possible keeping the useful part of the trace.

Once the feature is implemented, please also create documentation or an example.

mind the character limit when generating table names

when table names are composed of several components (which correspond to the elements in nested json) we can easily cross the 255 chat limit of ext filesystem or table name length limit of a particular db.

  1. prevent creating component table names longer than x characters: ie. 20 chars max and then 6 chars of hash
  2. prevent creating full table names longer than 200 chars. if this is exceeded then: root_table__depth_hash__parent_table__child_table where depth replaces all the inner tables (depth: number of replaces tables, hash: some hash on replaced content)

Unpacking nested lists

Example:

json: [{"data": ["distinct", ["field", 970, null]]}]
py dict: [{'data':['distinct', ['field', 970, None]]}]

perhaps a good option would be a kind of recursion depth selector, or a default to handle this, for example, if it cannot handle it particularly then convert it to string.

[core] implement extractor and shift to jsonl as internal format

Implement extractor that will replace ad hoc implementation in the Pipeline

  • stream processing - never loads more than one iteration at a time into memory
  • stores data in jsonl (as a consequence) - this will allow unpacker to follow the same pattern
  • for deferred iterators uses multi threading pool
  • generates load_id that is preserved until data is loaded
  • may drop the event count from the file name
  • optional: extract_many to extract many iterators in parallel
  • moves whole folders to unpacker to be more atomic

Implementation

temp folder already exists

From Paulo

Good afternoon sir
I gave dlt a try. I first read the readme file, I think you guys highlight the core skills needed of an analyst these days, which I very much agree with!
I then went for the quickstart guide. A couple of things come to mind so far:
2. Set up a virtual environment, probably a anaconda virtual environment setup guide would not hurt (for the windows users out there)? No one I know would ever use the Windows terminal for python development. I ended up taking the git repository, opening it as a PyCharm project, and then opening a PowerShell terminal to install the [gcp] part of the package
On the command, pipeline.create_pipeline(credentials), i get the errors...
FileExistsError: [WinError 183] Eine Datei kann nicht erstellt werden, wenn sie bereits vorhanden ist: 'C:\Users\PauloMoralesCastillo\AppData\Local\Temp\tmpaum_txq1\tmphc2v198g
it claims that the temp folder that is being created already exists. I then go to the temp files, delete this folder, and try it the command again. And then, once again

FileExistsError: [WinError 183] Eine Datei kann nicht erstellt werden, wenn sie bereits vorhanden ist: 'C:\Users\PauloMoralesCastillo\AppData\Local\Temp\tmpnr34ebea\tmp8y_ya02c'
Notice that the temp folder is different. So what seems to be happening is that the command is trying to create the temp folder twice?
This is my systeminfo
Betriebssystemname: Microsoft Windows 10 Pro
Betriebssystemversion: 10.0.19044 Nicht zutreffend Build 19044
Betriebssystemhersteller: Microsoft Corporation
Betriebssystemkonfiguration: Eigenstรคndige Arbeitsstation
Typ des Betriebssystembuilds: Multiprocessor Free
pip freeze
cachetools==5.2.0
certifi==2022.6.15
charset-normalizer==2.0.12
google-api-core==2.8.2
google-auth==2.8.0
google-cloud-bigquery==2.34.4
google-cloud-core==2.3.1
google-crc32c==1.3.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.3
grpcio==1.43.0
grpcio-status==1.43.0
hexbytes==0.2.2
idna==3.3
json-logging==1.4.1rc0
jsonlines==2.0.0
packaging==21.3
pendulum==2.1.2
prometheus-client==0.11.0
proto-plus==1.20.6
protobuf==3.20.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==3.0.9
python-dateutil==2.8.2
python-dlt==0.1.0rc3
pytzdata==2020.1
PyYAML==5.4.1
requests==2.28.0
rsa==4.8
semver==2.13.0
sentry-sdk==1.6.0
simplejson==3.17.6
six==1.16.0
urllib3==1.26.9
Would you like me to open an issue in git with all the info? (edited)

[loader] refactor loader

  • convert loader into class in order to support many instances
  • convert all DDL and DML queries into SQL jobs. execute initial SQL job to update schema and create temp tables. execute final SQL job(s) to merge, upsert or replace data and then drop tables
  • all SQL jobs should be generated before any other job is executed

Use #34 as reference
Part of #17

[core] consider change to how we create FK relationships for nested JSON

Currently we have two ways of linking:
no primary key is defined:

  1. we always create _record_hash which is unique and deterministic for a given row and will serve as primary key. that applies to parent and child tables
  2. we link child table by referring to parent _record_hash so no matter how deep we have the same link structure.
  3. optionally the top level parent hash is propagated so each child has link to the top table

primary key is defined:

  1. and 3 above stand
  2. we keep primary key as defined in the schema and create no additional linking

see mainnet_6_ethereum and tables blocks -> blocks_transactions -> blocks_transactions_logs which all have natural key which is kept

Stitch is using similar linking concept but it propagates the top level parent key and adds lists positions
https://www.stitchdata.com/docs/replication/loading/understanding-loading-behavior

should we maybe follow stitch? @adrianbr pls investigate

implement include and exclude filters in Schema as data item transformations

row filtering can be done in item transform and be added optionally to any resource. implementation in Schema is slow and only solves problems it has created :)

here's a GPT-4 prompt that writes correct function

Write me a function in python that takes a nested dictionary as input. the dictionary can contain dictionaries, lists and basic types as values. the keys are string. the function takes two more arguments: a list of exclude regexes and a list of include regexes. the regex is matching paths in the dictionary. the paths are similar to json path but a separator is __ if given element has a path matching exclude regex it is removed from dictionary. however if any of the nested (child) elements of that element matches include path it should stay but other elements should be removed

testing:
normalize tests are using various advanced modes of filtering. they must pass with new function

[core] combine load files if for the same table

if the loader client (bq/redshift/postgres) allows (ie. file size limit, query size limit below) files in load package belonging to same table can be combined

  1. jsonl can be always combined
  2. csv may be combined if the headers (fist line) match
  3. insert files should not be combined

[source] demo of DLT -> prodigy interface

Using existing rasa source show how we can feed rasa tracker store into prodigy ie. in a notebook

  • get rasa data ie. from file or from sql store
  • use rasa source to yield user events
  • convert user events into format understood by prodigy (get text, intent as label, confidence in meta etc.)
  • run prodigy annotation task on the source

[core] add data types autodetection

Add the ability to detect data types from the content of the data. Main use case is to detect timestamp from strings that are ISO date string.

Implementation

  • enable list of autodectors in the normalization part of the schema
  • enable timestamp autodetectors by default
  • make autodetection system easily extendable

Details
Default autodetection section looks as follows

normalizers:
  detections:
  - timestamp
  - iso_timestamp

where timestamp detect from UNIX floats (within a year of current time) and iso_timestamp detect based on ISO format (and similar).

autodetectors are names of function in dlt.schema.detection module so adding more detectors is trivial.

[loader] implement `replace table` write disposition

  • we should be able to specify how the content is stored in the loader.

  • we implement two types

  • append we add more data to existing table

  • replace we replace content in the table

append is already implemented
replace can be easily done with BigQuery with WriteDisposition.WRITE_TRUNCATE and for redshift by truncating table

assumption
We can assume that unpacker will produce a single load file for replace. that significantly simplifies implementation

aux tasks

  • loading clients should expose more capabilities: preferred file format, supported formats, etc.

Airflow deployment

Create an airflow deployment similar to the current git actions deployment. The deployment should support

    • generation of a dag running the pipeline
    • ideally this dag should contain tasks representing resources, in the correct dependency. If not currently possible, start with just running the pipeline in a task before defining next steps.
    • credentials should be passed into airflow or the appropriate user actions/guidance to take should be offered on CLI similar to the other deployment command.

column names should be alphanumeric

a best practice for table and column names is to keep them as aphanumeric with underscores.

Each database system may support quoted special characters in column names to some extent, but it is best to avoid them.

A solution could be converting "illegal" characters into a numeric code.

example: <class 'google.api_core.exceptions.BadRequest'>
400 Illegal field name: stats__execution__num_by_latency__<_1

[core] schema overhaul

we should overhaul the existing schema to support the following

  • add precision and scale to applicable types -> moved to #39
  • allow to add descriptions to tables and columns
  • review all hints and remove/add -> postponed
  • allow to exclude fields and tables
  • allow to seal table schemas to changes are ignored -> #40
  • configure unpacker: which fields are propagated to child tables, ??
  • configure loader: write disposition (#17 )
  • methods to normalize names and generate paths should be pluggable and available in the schema
  • return child - parent relationships from normalizer and store in schema
  • all filters, pref types and hints without re: prefix are treated as regexes ^...$ for easy specification of filters without knowledge of regexes
  • renames the autogenerated fields and tables: adds consistent _dlt prefix

[core] data hashing and anonymization

a mechanism to remove or anonymize parts of the data - before they reach the target.

  • remove secret data ie access cookies
  • GDPR compliance

implement in the schema

  • include, exclude hints
  • hash value hint -> add "hash" option to filters

The hashing

  1. should be part of "filter_rows" method in schema
  2. should be deterministic
  3. should use salt, if salt is not available, it should use table name as salt (yeah weak, but better than nothing)

[source] implement relational database source

Hello folks !

First I want to say thank you for your work on dlt. The idea of a "dbt for EL" + an integration with dbt is just what Data Engineering needed ๐Ÿ™๐Ÿฝ

Description

A lot of companies don't have an API layer on top of their applications databases. They rely on SQL queries to extract transactional data from applications databases before loading it to the datawarehouse. Maybe there is such a feature but I don't understand how to use it with dlt.

Who will this benefit?

Anyone having relational databases as data sources & relying on "SQLAlchemy-like" connection & SQL to extract data.

Thank you !

[singer] implement singer source

Implement singer source as follows

  • source requires venv with singer dependency (#14 )
  • singer is run through control pipeline (#15 )
  • records are yielded with stream name as table name
  • state is gathered and written at the end
  • FULL_TABLE replication is supported (#17 )
  • INCREMENTAL with bookmark is supported on the source level - records will be filtered out based on the column value in state

[core] refactor unpacker

Unpacker must be refactored such as

  1. it is a class that can be instantiated several times
  2. deals with folders where files associated with load_ids reside
  3. uses parser configured in the schema
  4. uses naming convention configured in the schema (normalization, path generation for tables and nested entities etc.)
  5. implements PoolRunnable to be able to be run
  6. many instances of Prometheus metrics in different namespaces may exist
  7. processes data from jsonl one document at a time
  8. for tables that are marked as merge or replace it must merge the files corresponding to single table

this also means

  • small refactor of data writers (to be able to merge)
  • refactor of schema (part of #18 )
  • pool runner must support PoolRunnable
  • solves #11

[singer] infer schema from singer catalogue

Often unpacker is producing much better schemas automatically than those included in singer, but the option should exist

  • infer write disposition for tables (#17 )
  • infer primary keys for tables
  • infer replication key as sorted hint (why not :))
  • convert names of fields, tables, derive child tables in a way compatible with current unpacker (parser, table naming function)

Optional:

  • convert JSON SCHEMA with singer types to DLT schema
  • remove schema element not selected in breadcrumb

user log /callback log

We need user-readable logs

  • what loaded (source) where (hostname/destination/project/dataset)
  • ideally nr of
  • schema changes

and callback parameters

  • Pipeline/ task name

  • env name - In airflow you have a base url that tells you which instance is failing.

  • logs url

  • Schema changes

  • error message

  • success message:

    • how many rows were loaded to what table
    • if runs metadata is logged persistently then give a summary if the vol of rows is normal (compare to last runs)

Partial success:

  • unless intentionally configured, this is a non-atomic load and should be avoided.
  • report failures - offer way to retry? offer way to

Retry:

  • keep track of retries to be able to report them on final fail
  • optionally report failures of retries. In some patterns you will want to turn off retry notification and only notify failure.

[data lineage] Showing the user what tables & corresponding columns got changed while loading into the database.

DLT will change any column and table name to lower case and will also replace spaces with '_'. As to not confuse the user of the pipeline, it might be helpful to keep track and show these changes after the pipeline has finished loading data into the database. This could be done in load info (showing just how many name changes occurred), logging info every time there is a name change and this information should also be in the schema after the pipeline has finished loading.
Additionally if a column only has NULL values for all rows, it will be dropped. I think this should also be shown to a user. Basically any change of this sort where a user expects thing A to be loaded and thing B (or nothing) gets loaded should be shown to the user.

[core] add precision and scale to applicable types

add precision and scale settings to following types

  • represent precision and scale as column hints for decimal types
  • precision for strings and binaries is a number of characters/bytes
  • precision for integers is number of bits
  • precision for time is number of digits of part of second (3 - mili, 6 - micro, 9 - nano)
  • accept decimal(9,1) from code and when loading schema, translate that to separate hints
  • we can leave the validation of the precision to the database engine - during schema creation

generate `dlt` resources from the openAPI specification

Motivation
Most of the APIs have their openAPI definitions accessible and it is a common approach to generate python clients automatically. We want to do the same and generate the dlt.resources corresponding to that api endpoints.
The intention is to generate an initial code for the dlt user than needs to access a new API. As your first test cases pick Pipedrive and Hubspot (which we know to generate very nice datasets with dlt).
For more openAPI specs look in rapidapi

As mentioned, python clients are generated this way. Here's one that is template based: https://github.com/openapi-generators/openapi-python-client. We may be able to use the same approach

Requirements

    • for all the GET resources generate corresponding dlt.resources. convert the path and query parameters into input arguments, generate typing and defaults (I believe that you can copy it from the python client generating code)
    • try to figure out the authentication method in the open api. provide a helper method for each authentication type, pass the required authentication elements (ie. bearer token, api key and secret or whatever it is) to the dlt.resource methods and mark them with dlt.secret.value so they are automatically injected.
    • most of the apis define the returned documents with json schemas. convert the types into TTableSchema so we have correct types for table columns. (let's discuss the nested tables later)
    • when the list of object are returned we should be able to (optionally - a flag when generating code possibly) to still yield items one by one.
    • base URL for the API should be taken from the openapi servers property when available. When multiple servers create a mapping and accept argument/config prop either a server name (e.g. production) or url https://example.com/api

Additional Heuristics

    • most of the apis let the user to filter for lists of object ids and then provide the endpoint to get those object details. ideally we would create dlt.transformer that allow pipelining data from lists to those "enriching" endpoints. let's find a way to do it at least in some of the cases.
    • it would be cool to figure out which of the fields in the returned data are unique identifiers and add primary_key hints
    • it would be cool to figure how data is paginated when a list of items is requested and generate paginator code and dlt.resources that can take N items or all items across all pages. there's only a few pagination types. alternatively we could pass options like: this is pagination of type x and this is the next_page element etc.

Deliverable:

  • as an extension to dlt init command.
  • we'll start with separate branch and the generator as standalone script. then merge when it is useful

[dbt] generate dbt package for a pipeline

Motivation
dlt automates extraction and loading data into a warehouse. it also infers table schema from the data and allows to add various hints uniqueness and nullability. the loading is "atomic" thanks to "load_id" mechanism where any new piece of data is marked with load_id which is flagged as valid after all the data was correctly loaded (see here https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/staging/load_ids.sql and https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/views/_loads.sql). The same mechanism allows to chain transformations for every load_id.

dbt is a popular way of defining transformations and dlt implements an extremely easy to use runner (see #28 ) that executes any dbt package in isolated python environment. it also manages the dbt profiles and credentials so the same pipeline definition can be used both for loading and transforming.

we want to make this even easier by generating initial package that will contain all the models and support the chaining mechanism. the user just needs to add transformations for their tables. Here is an example of a dbt package that was started from such template: https://github.com/dlt-hub/rasa_semantic_schema

Requirements

    • the generated package must work with dbt runner as implemented in #28
    • You also need to create the dbt template package above
    • null and unique tests should included
    • should work with bigquery redshift postgres duckdb (in fact the materializations for _dlt_loads table are so simple that they should work everywhere)

Implementation
Let's start from standalone script that generates dbt package from Pipeline instance passed. The instance will contain all relevant schemas with table definitions.

[dbt] allow executing dbt packages via dbt runner

Make the dbt runner working again. The runner present in https://github.com/dlt-hub/dlt/tree/devel/dlt/dbt_runner works with 1.0 and operates by calling the dbt code directly, not via command line. Let's try to keep that functionality with several enhancements.

    • allow for easy binding dbt transformations with dlt pipelines
    • runner should work within venv as implemented in #14 and #15. if venv is not explicitly passed or does not exist in the passed directory, a new venv should be created. if venv is created in temporary dir, it should be removed after use
    • a correct dbt version with correct dbt packages(s) should be installed as dependency.
    • the information above comes from destination that must be specified in Pipeline
    • create a temporary profiles.yml that takes credentials from env. create required os.environ from the credentials present in Pipeline. (the credentials are a Python dict) - creation is trivial
    • convert dbt runner into a class like Normalize and use config injection for DBTRunnerConfiguration
    • implement tests. tbd.

empty rows get dropped when normalizing and loading

If an empty row (ie {} or {"value": None} is passed to pipeline run method, it gets dropped and nothing is loaded
correct behavior: load a row with all columns set to NULL (except dlt added ones)

bug applies only to rows for existing tables with existing columns. In case of non existing tables and table stubs (without any columns) such empty rows should not generate records to load

this most probably happens in normalize stage where rows get dropped if all data is filtered out by exclude filters

Nullable column causes pipeline to fail in certain cases

Hi hi

I'm just starting my contributions to dlt and ran into my first issue.

It happens when the column is null in first >=5000 rows (5000 matches buffer writer limit),
then a following row which has a non-null value for the column causes the pipeline to fail.

This pipeline reproduces the issue:

import dlt
import random

@dlt.source
def dummy():
    return [table_rows()]


@dlt.resource(write_disposition='append')
def table_rows():
    for _ in range(5000):
        yield dict(display_name=None, random_number=random.random())
    yield dict(display_name='Dummy name', random_number=random.random())


def main() -> None:
    p = dlt.pipeline(
        pipeline_name='dummy', destination='postgres', dataset_name='dummy_data', full_refresh=False
    )

    info = p.run(dummy())
    print(info)


if __name__ == '__main__':
    main()

Traceback:

Traceback (most recent call last):
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/runners/pool_runner.py", line 79, in run_pool
    run_metrics = run_f.run(cast(TPool, pool))
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 257, in run
    self.spool_schema_files(schema_name, list(files_in_schema))
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 235, in spool_schema_files
    self.spool_files(schema_name, load_id, map_parallel_f, files)
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 195, in spool_files
    total_items, schema_updates, chunk_files = map_f(schema, load_id, files)
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 171, in map_single
    processed_chunk = Normalize.w_normalize_files(
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 120, in w_normalize_files
    load_storage.close_writers(load_id)
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/storages/data_item_storage.py", line 33, in close_writers
    writer.close_writer()
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 77, in close_writer
    self._flush_and_close_file()
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 105, in _flush_and_close_file
    self._flush_items()
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 100, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 151, in write_data
    write_row(rows[-1])
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 136, in write_row
    raise
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 136, in write_row
    raise
  File "/usr/lib/python3.10/bdb.py", line 90, in trace_dispatch
    return self.dispatch_line(frame)
  File "/usr/lib/python3.10/bdb.py", line 115, in dispatch_line
    if self.quitting: raise BdbQuit
bdb.BdbQuit
Traceback (most recent call last):
  File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 133, in write_row
    output[self._headers_lookup[n]] = self._caps.escape_literal(v)
KeyError: 'display_name'

Not sure what a solution is yet.
Maybe a new file needs to be started when there are schema changes or something along those lines?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.