dlt-hub / dlt Goto Github PK
View Code? Open in Web Editor NEWdata load tool (dlt) is an open source Python library that makes data loading easy ๐ ๏ธ
Home Page: https://dlthub.com/docs
License: Apache License 2.0
data load tool (dlt) is an open source Python library that makes data loading easy ๐ ๏ธ
Home Page: https://dlthub.com/docs
License: Apache License 2.0
Motivation
Providing that #118 is implemented we want to add simple materializations for the tables defined in schemas of a given pipeline. Those simple materializations will make sure that data is
you can think about it as an extension of initial models defined in #118 initial dbt package
Requirements
primary_key
hint we want a materialization that deduplicates the source
tablemerge_key
hint we want a materialization that merges the data according to the keyload_id
for incremental loadingbigquery
redshift
postgres
duckdb
Background
Implement Snowflake destination based on PUT stage. It will be quite similar to the BigQuery
implementation (just simpler).
Tasks
jsonl
as preferred typePUT
to stageAuthentication
Derived from ConnectionStringCredentials:
warehouse
role
Tests
all in all writing the test suite is more work than the code here
Motivation
low prio: I think the number of professionals who care about date storage size and care to manage it separately is in decline so consider this a low prio unless we learn otherwise. I think most people would prefer utc timestamps that they can localize in their frontend app.
why date?: Currently we only support timestamp. In most BI databases DATE costs half to store and can be more than 2x cheaper to join on (compressed value join vs calculated value join) native date than calculated date.
Tasks
py_type_to_sc_type
function (there's TODO already)optional
4. add type coercions in coerce_value
function. from iso string and from timestamp without days part. also add coercion from date to timestamp
Tests
py_type_to_sc_type
testscoerce_value
teststest_load_with_all_types
and TABLE_ROW
fixture, add datetime to tested columnsImplement a module that allows to create python virtual environments and execute processes with it
requirement
formatexample:
https://gist.github.com/mpurdon/be7f88ee4707f161215187f41c3077f6
implement proper state management in Pipeline
Implementation
In a json doc, beyond a nesting depth of 2 (from root) there is usually not much useful info, but often still some nested jsons
An example could be that an event (say "payment") has sub entities ("order", "payment attempt", "payment method"), those sub-entities might have sub entities (order has items) and those sub entities have versions or validity.
What we want to do for those cases is unpack to a specified depth (pass as arg) and load the nested data as string so we can have it for record keeping/auditing and possible future use, but not unpack it for tidiness. We could default this to something reasonable too, to avoid "too deep for recursion" type of errors.
action to be triggered by certain events in the pipeline can be delegated by the pipeline developer. examples of events and actions:
[we need more events]
we need a clean way to define and add hook functions. investigate if https://github.com/getslash/gossip fits.
we need to think about source interface,
organize sources: we had an idea for
dlt
repositorydlt-sources-contrib
but reviewed in tested on some rudimentary level. those sources should be maintained by the communitydlt-sources-contrib
repo but not distributed in contrib packageimplementation:
generation functions are quite complicated python objects. we can use __name__
and __qualname__
to pass our metadata (with decorator over the source function)
They also contain gi_
fields from where we can get information on actual code being executed, local variables, function arguments etc.
collections.Generator
does not provide such information
https://docs.python.org/3/library/inspect.html - look for those variables explanation, also helper function to check generator function instances
import code
code.interact(local=locals())
# https://stackoverflow.com/questions/29488327/changing-the-name-of-a-generator
# https://bugs.python.org/issue21205
# https://docs.python.org/3/library/inspect.html
# https://stackoverflow.com/questions/58108488/what-is-qualname-in-python
Sealing a table or the whole schema means that:
The difficulties
dlt
to infer the schema. are there any requirements to make the sealing easier or we just require to modify the schema at runtimemimimum requirements from Adrian:
At the very least, we should be able to toggle "contract_mode=On" (let's use something relatable rather than seal?)
In this mode,
What should happen when schema is not allowed to evolve?
Open questions by @sh-rp :
Motivation
Tasks
HTTPAdapter
: https://majornetwork.net/2022/04/handling-retries-in-python-requests/Place code in dlt.sources.helpers.requests
module and expose the default session as requests
and configurable session as requests_with_retry(...)
the point of the above is to provide drop-in replacement for people making requests in pipelines
repository, They'd just import from some other module but the remaining code could stay the same.
from dlt.pipeline.typing import GCPPipelineCredentials
GCPPipelineCredentials.from_services_dict(credentials, dataset_prefix='')
currently: not passing the prefix or passing an empty string results in an error
desired: Not passing a prefix, or passing None results in no prefix being used.
some info on alchemy bulk loads
https://stackoverflow.com/questions/3659142/bulk-insert-with-sqlalchemy-orm
(it actually uses core)
We need to be able to access schema changes that have happened during a load. We should ideally be very informative, as the volume of schema events is low, so capturing all of them is not overdoing it.
We should capture information such as:
This information should be accessible from the pipeline after loading.
Documentation should be created or examples.
Often json document contain inconsistently typed elements that may occur 1 or more times. In case of 1 they are inserted directly as type T, if > 1 they are inserted as list of type T.
ie. first_name
may be "Juan" or ["Juan", "Pablo"]
in that case DLT builds bot flattened representation and table representation for T and inserts in both place.
Fix the json normalizer such that if table is present in the schema, the table is always used.
in relational.py
if isinstance(v, (dict, list)):
if not _is_complex_type(schema, table, child_name, __r_lvl):
# TODO: if schema contains table {table}__{child_name} then convert v into single element list
if isinstance(v, dict):
# flatten the dict more
norm_row_dicts(v, __r_lvl + 1, parent_name=child_name)
else:
# pass the list to out_rec_list
out_rec_list[child_name] = v
Note: in case of automatic schema inference, all values until first list of T is detected will be added flattened. To assure consistency such tables should be added to the schema before first data item is normalized
We need to be able to enforce schemas. We should be able to set a pipelne to a "fixed schema" mode or "data contract" mode
In this mode, the schema persisted at destination becomes the "valid" schema. If there is no schema yet, then a schema is created by parsing some data. This would enable a person wanting to track data to generate a schema from an event, and enforce it.
When attempting to load the records into the frozen schema, the pipeline should handle bad records into a "bad records" table, with the info
Once the feature is implemented, please also create documentation or an example.
when table names are composed of several components (which correspond to the elements in nested json) we can easily cross the 255 chat limit of ext filesystem or table name length limit of a particular db.
Example:
json: [{"data": ["distinct", ["field", 970, null]]}]
py dict: [{'data':['distinct', ['field', 970, None]]}]
perhaps a good option would be a kind of recursion depth selector, or a default to handle this, for example, if it cannot handle it particularly then convert it to string.
Implement extractor that will replace ad hoc implementation in the Pipeline
load_id
that is preserved until data is loadedextract_many
to extract many iterators in parallelImplementation
From Paulo
Good afternoon sir
I gave dlt a try. I first read the readme file, I think you guys highlight the core skills needed of an analyst these days, which I very much agree with!
I then went for the quickstart guide. A couple of things come to mind so far:
2. Set up a virtual environment, probably a anaconda virtual environment setup guide would not hurt (for the windows users out there)? No one I know would ever use the Windows terminal for python development. I ended up taking the git repository, opening it as a PyCharm project, and then opening a PowerShell terminal to install the [gcp] part of the package
On the command, pipeline.create_pipeline(credentials), i get the errors...
FileExistsError: [WinError 183] Eine Datei kann nicht erstellt werden, wenn sie bereits vorhanden ist: 'C:\Users\PauloMoralesCastillo\AppData\Local\Temp\tmpaum_txq1\tmphc2v198g
it claims that the temp folder that is being created already exists. I then go to the temp files, delete this folder, and try it the command again. And then, once again
FileExistsError: [WinError 183] Eine Datei kann nicht erstellt werden, wenn sie bereits vorhanden ist: 'C:\Users\PauloMoralesCastillo\AppData\Local\Temp\tmpnr34ebea\tmp8y_ya02c'
Notice that the temp folder is different. So what seems to be happening is that the command is trying to create the temp folder twice?
This is my systeminfo
Betriebssystemname: Microsoft Windows 10 Pro
Betriebssystemversion: 10.0.19044 Nicht zutreffend Build 19044
Betriebssystemhersteller: Microsoft Corporation
Betriebssystemkonfiguration: Eigenstรคndige Arbeitsstation
Typ des Betriebssystembuilds: Multiprocessor Free
pip freeze
cachetools==5.2.0
certifi==2022.6.15
charset-normalizer==2.0.12
google-api-core==2.8.2
google-auth==2.8.0
google-cloud-bigquery==2.34.4
google-cloud-core==2.3.1
google-crc32c==1.3.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.3
grpcio==1.43.0
grpcio-status==1.43.0
hexbytes==0.2.2
idna==3.3
json-logging==1.4.1rc0
jsonlines==2.0.0
packaging==21.3
pendulum==2.1.2
prometheus-client==0.11.0
proto-plus==1.20.6
protobuf==3.20.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==3.0.9
python-dateutil==2.8.2
python-dlt==0.1.0rc3
pytzdata==2020.1
PyYAML==5.4.1
requests==2.28.0
rsa==4.8
semver==2.13.0
sentry-sdk==1.6.0
simplejson==3.17.6
six==1.16.0
urllib3==1.26.9
Would you like me to open an issue in git with all the info? (edited)
Currently the US
location is hardcoded. Let it be changed in the settings.
in order to run singer source or dbt runner we need to be able to launch a process in venv (#14) and send/receive data from it
ie. https://github.com/datamill-co/singer-runner/tree/master/singer_runner
Currently we have two ways of linking:
no primary key is defined:
_record_hash
which is unique and deterministic for a given row and will serve as primary key. that applies to parent and child tables_record_hash
so no matter how deep we have the same link structure.primary key is defined:
see mainnet_6_ethereum
and tables blocks
-> blocks_transactions
-> blocks_transactions_logs
which all have natural key which is kept
Stitch is using similar linking concept but it propagates the top level parent key and adds lists positions
https://www.stitchdata.com/docs/replication/loading/understanding-loading-behavior
should we maybe follow stitch? @adrianbr pls investigate
row filtering can be done in item transform and be added optionally to any resource. implementation in Schema is slow and only solves problems it has created :)
here's a GPT-4 prompt that writes correct function
Write me a function in python that takes a nested dictionary as input. the dictionary can contain dictionaries, lists and basic types as values. the keys are string. the function takes two more arguments: a list of exclude regexes and a list of include regexes. the regex is matching paths in the dictionary. the paths are similar to json path but a separator is
__
if given element has a path matching exclude regex it is removed from dictionary. however if any of the nested (child) elements of that element matches include path it should stay but other elements should be removed
testing:
normalize
tests are using various advanced modes of filtering. they must pass with new function
if the loader client (bq/redshift/postgres) allows (ie. file size limit, query size limit below) files in load package belonging to same table can be combined
Using existing rasa source show how we can feed rasa tracker store into prodigy ie. in a notebook
Add the ability to detect data types from the content of the data. Main use case is to detect timestamp
from strings that are ISO date string.
Implementation
normalization
part of the schemaDetails
Default autodetection section looks as follows
normalizers:
detections:
- timestamp
- iso_timestamp
where timestamp
detect from UNIX floats (within a year of current time) and iso_timestamp
detect based on ISO format (and similar).
autodetectors are names of function in dlt.schema.detection
module so adding more detectors is trivial.
we should be able to specify how the content is stored in the loader.
we implement two types
append
we add more data to existing table
replace
we replace content in the table
append
is already implemented
replace
can be easily done with BigQuery with WriteDisposition.WRITE_TRUNCATE
and for redshift by truncating table
assumption
We can assume that unpacker will produce a single load file for replace
. that significantly simplifies implementation
aux tasks
Create an airflow deployment similar to the current git actions deployment. The deployment should support
a best practice for table and column names is to keep them as aphanumeric with underscores.
Each database system may support quoted special characters in column names to some extent, but it is best to avoid them.
A solution could be converting "illegal" characters into a numeric code.
example: <class 'google.api_core.exceptions.BadRequest'>
400 Illegal field name: stats__execution__num_by_latency__<_1
we need to have some kind of clear message as to what changed (table, change, load_id, time) that we can pass to a function that will forward this somewhere. I can then add an example for slack.
credentials should be passed to the load method. The pipeline should not need credentials to be instantiated. Possible targets may include unauntenticated destinations such as files.
we should overhaul the existing schema to support the following
re:
prefix are treated as regexes ^...$ for easy specification of filters without knowledge of regexes_dlt
prefixa mechanism to remove or anonymize parts of the data - before they reach the target.
implement in the schema
filters
The hashing
Hello folks !
First I want to say thank you for your work on dlt
. The idea of a "dbt for EL" + an integration with dbt
is just what Data Engineering needed ๐๐ฝ
A lot of companies don't have an API layer on top of their applications databases. They rely on SQL queries to extract transactional data from applications databases before loading it to the datawarehouse. Maybe there is such a feature but I don't understand how to use it with dlt
.
Anyone having relational databases as data sources & relying on "SQLAlchemy-like" connection & SQL to extract data.
Thank you !
Implement singer source as follows
FULL_TABLE
replication is supported (#17 )INCREMENTAL
with bookmark is supported on the source level - records will be filtered out based on the column value in stateUnpacker must be refactored such as
load_ids
residePoolRunnable
to be able to be runmerge
or replace
it must merge the files corresponding to single tablethis also means
Often unpacker is producing much better schemas automatically than those included in singer, but the option should exist
sorted
hint (why not :))Optional:
We need user-readable logs
and callback parameters
Pipeline/ task name
env name - In airflow you have a base url that tells you which instance is failing.
logs url
Schema changes
error message
success message:
Partial success:
Retry:
DLT will change any column and table name to lower case and will also replace spaces with '_'. As to not confuse the user of the pipeline, it might be helpful to keep track and show these changes after the pipeline has finished loading data into the database. This could be done in load info (showing just how many name changes occurred), logging info every time there is a name change and this information should also be in the schema after the pipeline has finished loading.
Additionally if a column only has NULL values for all rows, it will be dropped. I think this should also be shown to a user. Basically any change of this sort where a user expects thing A to be loaded and thing B (or nothing) gets loaded should be shown to the user.
add precision and scale settings to following types
Motivation
Most of the APIs have their openAPI definitions accessible and it is a common approach to generate python clients automatically. We want to do the same and generate the dlt.resources
corresponding to that api endpoints.
The intention is to generate an initial code for the dlt
user than needs to access a new API. As your first test cases pick Pipedrive and Hubspot (which we know to generate very nice datasets with dlt).
For more openAPI specs look in rapidapi
As mentioned, python clients are generated this way. Here's one that is template based: https://github.com/openapi-generators/openapi-python-client. We may be able to use the same approach
Requirements
dlt.resources
. convert the path and query parameters into input arguments, generate typing and defaults (I believe that you can copy it from the python client generating code)dlt.resource
methods and mark them with dlt.secret.value
so they are automatically injected.TTableSchema
so we have correct types for table columns. (let's discuss the nested tables later)servers
property when available. When multiple servers create a mapping and accept argument/config prop either a server name (e.g. production
) or url https://example.com/api
Additional Heuristics
dlt.transformer
that allow pipelining data from lists to those "enriching" endpoints. let's find a way to do it at least in some of the cases.primary_key
hintsdlt.resources
that can take N items or all items across all pages. there's only a few pagination types. alternatively we could pass options like: this is pagination of type x and this is the next_page element etc.Deliverable:
dlt init
command.DLT crashes when trying to load the Spotify 'Playlists' nor 'Your Library' JSON files.
Here is the Spotify (archive download) source code: https://github.com/scale-vector/spotify-dlt-demo/blob/main/spotify.py
Here is the error file: dlt_spotify_bug.txt
The Spotify data can provided if necessary
Other details
Motivation
dlt
automates extraction and loading data into a warehouse. it also infers table schema from the data and allows to add various hints uniqueness and nullability. the loading is "atomic" thanks to "load_id" mechanism where any new piece of data is marked with load_id which is flagged as valid after all the data was correctly loaded (see here https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/staging/load_ids.sql and https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/views/_loads.sql). The same mechanism allows to chain transformations for every load_id
.
dbt
is a popular way of defining transformations and dlt
implements an extremely easy to use runner (see #28 ) that executes any dbt
package in isolated python environment. it also manages the dbt
profiles and credentials so the same pipeline
definition can be used both for loading and transforming.
we want to make this even easier by generating initial package that will contain all the models and support the chaining mechanism. the user just needs to add transformations for their tables. Here is an example of a dbt package that was started from such template: https://github.com/dlt-hub/rasa_semantic_schema
Requirements
dbt
runner as implemented in #28dbt
(our schemas are similar to dbt model yml so it should be quite easy) - see https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/sources.ymlload_id
status update as in https://github.com/dlt-hub/rasa_semantic_schema/blob/master/models/views/_loads.sqlbigquery
redshift
postgres
duckdb
(in fact the materializations for _dlt_loads
table are so simple that they should work everywhere)Implementation
Let's start from standalone script that generates dbt
package from Pipeline
instance passed. The instance will contain all relevant schemas with table definitions.
Make the dbt runner working again. The runner present in https://github.com/dlt-hub/dlt/tree/devel/dlt/dbt_runner works with 1.0
and operates by calling the dbt code directly, not via command line. Let's try to keep that functionality with several enhancements.
destination
that must be specified in Pipeline
profiles.yml
that takes credentials from env. create required os.environ
from the credentials present in Pipeline
. (the credentials are a Python dict
) - creation is trivialNormalize
and use config injection for DBTRunnerConfiguration
If an empty row (ie {}
or {"value": None}
is passed to pipeline run
method, it gets dropped and nothing is loaded
correct behavior: load a row with all columns set to NULL (except dlt
added ones)
bug applies only to rows for existing tables with existing columns. In case of non existing tables and table stubs (without any columns) such empty rows should not generate records to load
this most probably happens in normalize
stage where rows get dropped if all data is filtered out by exclude
filters
Hi hi
I'm just starting my contributions to dlt and ran into my first issue.
It happens when the column is null
in first >=5000 rows (5000 matches buffer writer limit),
then a following row which has a non-null value for the column causes the pipeline to fail.
This pipeline reproduces the issue:
import dlt
import random
@dlt.source
def dummy():
return [table_rows()]
@dlt.resource(write_disposition='append')
def table_rows():
for _ in range(5000):
yield dict(display_name=None, random_number=random.random())
yield dict(display_name='Dummy name', random_number=random.random())
def main() -> None:
p = dlt.pipeline(
pipeline_name='dummy', destination='postgres', dataset_name='dummy_data', full_refresh=False
)
info = p.run(dummy())
print(info)
if __name__ == '__main__':
main()
Traceback:
Traceback (most recent call last):
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/runners/pool_runner.py", line 79, in run_pool
run_metrics = run_f.run(cast(TPool, pool))
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 257, in run
self.spool_schema_files(schema_name, list(files_in_schema))
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 235, in spool_schema_files
self.spool_files(schema_name, load_id, map_parallel_f, files)
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 195, in spool_files
total_items, schema_updates, chunk_files = map_f(schema, load_id, files)
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 171, in map_single
processed_chunk = Normalize.w_normalize_files(
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/normalize/normalize.py", line 120, in w_normalize_files
load_storage.close_writers(load_id)
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/storages/data_item_storage.py", line 33, in close_writers
writer.close_writer()
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 77, in close_writer
self._flush_and_close_file()
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 105, in _flush_and_close_file
self._flush_items()
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/buffered.py", line 100, in _flush_items
self._writer.write_data(self._buffered_items)
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 151, in write_data
write_row(rows[-1])
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 136, in write_row
raise
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 136, in write_row
raise
File "/usr/lib/python3.10/bdb.py", line 90, in trace_dispatch
return self.dispatch_line(frame)
File "/usr/lib/python3.10/bdb.py", line 115, in dispatch_line
if self.quitting: raise BdbQuit
bdb.BdbQuit
Traceback (most recent call last):
File "/home/steinthor/projects/dlt-adrian/dlt/dlt/common/data_writers/writers.py", line 133, in write_row
output[self._headers_lookup[n]] = self._caps.escape_literal(v)
KeyError: 'display_name'
Not sure what a solution is yet.
Maybe a new file needs to be started when there are schema changes or something along those lines?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.