Coder Social home page Coder Social logo

cid-harvard / pandas-to-postgres Goto Github PK

View Code? Open in Web Editor NEW
54.0 54.0 12.0 66 KB

Copy Pandas DataFrames and HDF5 files to PostgreSQL database

License: BSD 3-Clause "New" or "Revised" License

Python 100.00%
data-pipeline database etl hdf hdf5 pandas postgres postgresql pytables python3 scientific-computing sqlalchemy sqlalchemy-database

pandas-to-postgres's People

Contributors

bleonard33 avatar makmanalp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pandas-to-postgres's Issues

Any known issues using this with Python 2.7?

Hello and thank you for sharing this code; it's exactly what I'm looking for.

My code base is currently Python 2.7. Do you happen to know whether there are any issues on that version?

No option to append to tables instead of truncating them

Thanks for the project it helps with the slow process for Dataframe to_sql and it's more straight to proceed with odoo or others library.

For now i have an issue that i am doing dataframe copy inside a for loop but it seems to overwrite table each time it push data.

connection_string = 'postgresql://postgres:test_password@localhost:5432/data'
sql_engine = create_engine(connection_string, echo=False, pool_size=10, max_overflow=-1)
Base = automap_base()
Base.prepare(sql_engine, reflect=True)
my_model = Base.metadata.tables['my_model']


for index, x in enumerate(flat_list_of_bucketes[10:]):
    if len(pd.read_sql('SELECT * FROM imported_files WHERE file_path=%(file_path)s', con=sql_engine, params={'file_path': x['path']})) == 0:
        exec_by = 'hostname: ' + platform.node() + ', python_version: '+ platform.python_version() + ', created_at: ' + dt.datetime.utcnow().strftime('%d-%M-%YT%H:%m')
        now = dt.datetime.utcnow()
        t = TicToc()
        print('start reading the file :' + x['path'])
        with t,  fs.open(x['path']) as f:
            df = pd.read_csv(f, compression='gzip', header=0, low_memory=False)
            t.toc('reading data takes ', restart=True)
            df['id'] = [uuid.uuid1() for _ in range(len(df.index))]
            df['created_by'] = exec_by
            df['created_at'] = now
            t.toc('cleansing data takes: ', restart=True)
           
            with  sql_engine.connect() as c:
                       DataFrameCopy(df.copy(), conn=c, table_obj=my_model)
            t.toc('data save to postgres in: ')
            total_pushed_rows = total_pushed_rows + len(df)
            print('Total pushed rows :' + str(total_pushed_rows))
            df_meta_info = pd.DataFrame(data={'file_path': [x['path']],
                                              'imported_at': [now],
                                              'imported_by': [exec_by],
                                              'meta_data':[str(x)]})

            df_meta_info.to_sql('imported_files', con=sql_engine, if_exists='append', index=False)
            t.toc('pushing metadata to postgres takes ', restart=True)
            print('file was saved to the database :' + df_meta_info['file_path'][0])
    print('total of % of processed files are :' + str(index / total_num_of_files) + '%')

Trying to figure out if i missing any options to append similair to the to_sql or i need to manage the commit myself, but no clue for now any help on how to solve this?

Support for partitioned tables with inherited constraints

In a particular use case, I would like to use this library to COPY data into a child partition of a table. Primary key is defined on the parent table and it is inherited by the child tables.
However, PostgreSQL doesn’t allow dropping inherited primary keys from the child partition. As a result, the code gives an error when primary key is added to the child table after copy, since the child table has inherited constrains which didn't drop in the drop step. Here is the full error traceback :-

InvalidTableDefinition                    Traceback (most recent call last)
~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1245                     self.dialect.do_execute(
-> 1246                         cursor, statement, parameters, context
   1247                     )

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    587     def do_execute(self, cursor, statement, parameters, context=None):
--> 588         cursor.execute(statement, parameters)
    589 

InvalidTableDefinition: multiple primary keys for table "forms_fz" are not allowed


The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
<ipython-input-185-30a0987f282e> in <module>
      3 
      4 with db.engine.connect() as c:
----> 5   DataFrameCopy(forms_df, conn=c, table_obj=table_model).copy()
      6 

~/Documents/MyDev/pandas-to-postgres-0.0.4/pandas_to_postgres/copy_df.py in copy(self, functions)
     51             self.logger.info("All chunks copied ({} rows)".format(self.rows))
     52 
---> 53         self.create_pk()
     54         self.create_fks()
     55         self.analyze()

~/Documents/MyDev/pandas-to-postgres-0.0.4/pandas_to_postgres/_base_copy.py in create_pk(self)
     78         """Create primary key constraints on PostgreSQL table"""
     79         self.logger.info("Creating {} primary key".format(self.sql_table))
---> 80         self.conn.execute(AddConstraint(self.primary_key))
     81 
     82     def drop_fks(self):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
    980             raise exc.ObjectNotExecutableError(object_)
    981         else:
--> 982             return meth(self, multiparams, params)
    983 
    984     def _execute_function(self, func, multiparams, params):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/sql/ddl.py in _execute_on_connection(self, connection, multiparams, params)
     70 
     71     def _execute_on_connection(self, connection, multiparams, params):
---> 72         return connection._execute_ddl(self, multiparams, params)
     73 
     74     def execute(self, bind=None, target=None):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_ddl(self, ddl, multiparams, params)
   1042             compiled,
   1043             None,
-> 1044             compiled,
   1045         )
   1046         if self._has_events or self.engine._has_events:

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1248         except BaseException as e:
   1249             self._handle_dbapi_exception(
-> 1250                 e, statement, parameters, cursor, context
   1251             )
   1252 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1474                 util.raise_from_cause(newraise, exc_info)
   1475             elif should_wrap:
-> 1476                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1477             else:
   1478                 util.reraise(*exc_info)

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info)
    396     exc_type, exc_value, exc_tb = exc_info
    397     cause = exc_value if exc_value is not exception else None
--> 398     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    399 
    400 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    150             value.__cause__ = cause
    151         if value.__traceback__ is not tb:
--> 152             raise value.with_traceback(tb)
    153         raise value
    154 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1244                 if not evt_handled:
   1245                     self.dialect.do_execute(
-> 1246                         cursor, statement, parameters, context
   1247                     )
   1248         except BaseException as e:

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    586 
    587     def do_execute(self, cursor, statement, parameters, context=None):
--> 588         cursor.execute(statement, parameters)
    589 
    590     def do_execute_no_params(self, cursor, statement, context=None):

ProgrammingError: (psycopg2.errors.InvalidTableDefinition) multiple primary keys for table "forms_fz" are not allowed

[SQL: ALTER TABLE forms_fz ADD CONSTRAINT forms_fz_pkey PRIMARY KEY (foundation, instance_id)]
(Background on this error at: http://sqlalche.me/e/f405)```

Foreign Keys not being recreated

We DROP CASCADE PKs, so later tables that had their FKs dropped in the cascade don't have self.fks instantiated (specifically if created with defer_sql_obs=True) because they have already been dropped.

Changes not rolled back when errors

If there is a problem in the COPY stage - for example when there is a mismatch between the table and the DataFrame column names, the code gives an error as expected. But before I can run the same code again, I have to go back and recreate the table in the database because the primary keys were dropped from the table and weren't added back in the errored run. Kindly take a look!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.