Coder Social home page Coder Social logo

aws / aws-sdk-pandas Goto Github PK

View Code? Open in Web Editor NEW
3.8K 61.0 667.0 14.27 MB

pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

Home Page: https://aws-sdk-pandas.readthedocs.io

License: Apache License 2.0

Python 77.46% Shell 0.28% Dockerfile 0.03% Jupyter Notebook 22.23% Batchfile 0.01%
python aws pandas apache-arrow apache-parquet data-engineering etl data-science redshift athena

aws-sdk-pandas's Introduction

AWS SDK for pandas (awswrangler)

Pandas on AWS

Easy integration with Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

AWS SDK for pandas tracker

An AWS Professional Service open source initiative | [email protected]

PyPi Conda Python Version Code style: ruff License

Checked with mypy Static Checking Documentation Status

Source Downloads Installation Command
PyPi PyPI Downloads pip install awswrangler
Conda Conda Downloads conda install -c conda-forge awswrangler

⚠️ Starting version 3.0, optional modules must be installed explicitly:
➡️pip install 'awswrangler[redshift]'

Table of contents

Quick Start

Installation command: pip install awswrangler

⚠️ Starting version 3.0, optional modules must be installed explicitly:
➡️pip install 'awswrangler[redshift]'

import awswrangler as wr
import pandas as pd
from datetime import datetime

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()

# Amazon Timestream Write
df = pd.DataFrame({
    "time": [datetime.now(), datetime.now()],   
    "my_dimension": ["foo", "boo"],
    "measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="measure",
    dimensions_cols=["my_dimension"],
)

# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")

At scale

AWS SDK for pandas can also run your workflows at scale by leveraging Modin and Ray. Both projects aim to speed up data workloads by distributing processing over a cluster of workers.

Read our docs or head to our latest tutorials to learn more.

⚠️ Ray is currently not available for Python 3.12. While AWS SDK for pandas supports Python 3.12, it cannot be used at scale.

Getting Help

The best way to interact with our team is through GitHub. You can open an issue and choose from one of our templates for bug reports, feature requests... You may also find help on these community resources:

Logging

Enabling internal logging examples:

import logging
logging.basicConfig(level=logging.INFO, format="[%(name)s][%(funcName)s] %(message)s")
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL)

Into AWS lambda:

import logging
logging.getLogger("awswrangler").setLevel(logging.DEBUG)

aws-sdk-pandas's People

Contributors

alvaropc avatar bechbd avatar bryanyang0528 avatar cnfait avatar danielwo avatar dependabot[bot] avatar flaviomax avatar hyandell avatar ia-community-analytics avatar igorborgest avatar jaidisido avatar jiteshsoni avatar khuengocdang avatar kukushking avatar leonluttenberger avatar luigift avatar malachi-constant avatar maxispeicher avatar nicholas-miles avatar nickcorbett avatar patrick-muller avatar raaidarshad avatar robert-schmidtke avatar schot avatar stijndehaes avatar timotk avatar tuliocasagrande avatar vikramsg avatar vlieven avatar weishao-aws avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aws-sdk-pandas's Issues

inconsistent NaN casting behavior with partitioned dataset

Hello again! I'm seeing some odd behavior that I'm unsure of the cause. User error is absolutely a possibility!

I am writing out the results of a daily Athena query to parquet. I have string columns that on some days have only NULL values. When I partition the dataset the null values get written out as the literal nan string. When I don't partition they come through as NULL values ok. I would like to keep NULL values as NULL's if possible. Here's an example setup:

With this CSV:

"col1","col2","col3","col4","pt"
,,,,"1"
,,,,"2"
,"foo","bar","baz","1"
,"foo","bar","baz","2"

Try writing out the data set both with and without the partition col specified. The unpartitioned dataset has NULLs, the partitioned dataset has 'nan' strings:

import pandas as pd
import awswrangler

session = awswrangler.Session()

dtypes = {
    'col1': 'object',
    'col2': 'object',
    'col3': 'object',
    'col4': 'object',
    'pt': 'object'
}
df = pd.read_csv('nan-debug.csv', dtype=dtypes)

schema = {
    'col1': 'string',
    'col2': 'string',
    'col3': 'string',
    'col4': 'string',
    'pt': 'string',
}

session.pandas.to_parquet(
    dataframe=df,
    database="default",
    path="s3://...",
    table="null-debug",
    # partition_cols=['pt'],
    mode="overwrite_partitions",
    cast_columns=schema
)

My guess is that the additional casting for partitioned datasets is the cause, though I haven't had a chance to validate: https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L761-L763

Thanks!

date type as partition key gets written as timestamp

Hi! I'm trying to write out a partitioned parquet dataset using a date column with values like YYYY-MM-DD as the partition key. However, the partition values in the s3 keys get written as YYYY-MM-DD 00:00:00. The type is date in Glue, and Athena throws errors about invalid partition keys. date types in non-partition columns do not appear to have any issues, though.

Here's a trivial example:

import pandas as pd
import awswrangler
session = awswrangler.Session()

# create some dummy data
data = {
    'col1': [
        'val1',
        'val2',
    ],
    # date column to see valid dates
    'datecol': [
        '2019-11-09',
        '2019-11-08'
    ],
    # dates in partition column to see invalid date strings
    'partcol': [
        '2019-11-09',
        '2019-11-08'
    ]
}

df = pd.DataFrame(data)
# cast date columns to pandas datetime, similar to the result of read_csv() with parse_dates
df = df.astype({'datecol': 'datetime64', 'partcol': 'datetime64'})

# tell Athena what types you want
schema = {
    'col1': 'string',
    'datecol': 'date',
    'partcol': 'date',
}

session.pandas.to_parquet(
    dataframe=df,
    database="default",
    path="s3://some-bucket/some-path/",
    partition_cols=['datecol'],
    table="debug-date-partitions",
    mode="overwrite_partitions",
    cast_columns=schema
)

Thanks again for the great library!

Allow pandas.read_sql_athena to pass in Workgroup

The program will throw ClientError: An error occurred (404) when calling the HeadObject operation: Not Found when the default workgroup was set to override client setting, and in the meanwhile, the caller did not pass in S3 output location that matches the default location.

Example cases:

Athena Environment :
  default workgroup: override client setting

Code:

 session = awswrangler.Session()
    dataframe = session.pandas.read_sql_athena(
        sql="select * from tables,
        database="database"
    )

Result:

~/anaconda3/lib/python3.7/site-packages/awswrangler/pandas.py in _read_csv_once(client_s3, bucket_name, key_path, header, names, usecols, dtype, sep, lineterminator, quotechar, quoting, esc
apechar, parse_dates, infer_datetime_format, encoding, converters)
    400         client_s3.download_fileobj(Bucket=bucket_name,
    401                                    Key=key_path,
--> 402                                    Fileobj=buff)
    403         buff.seek(0),
    404         dataframe = pandas.read_csv(

~/anaconda3/lib/python3.7/site-packages/boto3/s3/inject.py in download_fileobj(self, Bucket, Key, Fileobj, ExtraArgs, Callback, Config)
    676             bucket=Bucket, key=Key, fileobj=Fileobj,
    677             extra_args=ExtraArgs, subscribers=subscribers)
--> 678         return future.result()
    679
    680

~/anaconda3/lib/python3.7/site-packages/s3transfer/futures.py in result(self)
    104             # however if a KeyboardInterrupt is raised we want want to exit
    105             # out of this and propogate the exception.
--> 106             return self._coordinator.result()
    107         except KeyboardInterrupt as e:
    108             self.cancel()

~/anaconda3/lib/python3.7/site-packages/s3transfer/futures.py in result(self)
    263         # final result.
    264         if self._exception:
--> 265             raise self._exception
    266         return self._result
    267

~/anaconda3/lib/python3.7/site-packages/s3transfer/tasks.py in _main(self, transfer_future, **kwargs)
    253             # Call the submit method to start submitting tasks to execute the
    254             # transfer.
--> 255             self._submit(transfer_future=transfer_future, **kwargs)
    256         except BaseException as e:
    257             # If there was an exception raised during the submission of task

~/anaconda3/lib/python3.7/site-packages/s3transfer/download.py in _submit(self, client, config, osutil, request_executor, io_executor, transfer_future, bandwidth_limiter)
    343                 Bucket=transfer_future.meta.call_args.bucket,
    344                 Key=transfer_future.meta.call_args.key,
--> 345                 **transfer_future.meta.call_args.extra_args
    346             )
    347             transfer_future.meta.provide_transfer_size(

~/anaconda3/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    355                     "%s() only accepts keyword arguments." % py_operation_name)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358
    359         _api_call.__name__ = str(py_operation_name)

~/anaconda3/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    659             error_code = parsed_response.get("Error", {}).get("Code")
    660             error_class = self.exceptions.from_code(error_code)
--> 661             raise error_class(parsed_response, operation_name)
    662         else:
    663             return parsed_response

ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
  • This issue will not happen if we pass in the same s3 output location as we set for the default workgroups, but it is better to have the choice to switch over workgroups.

Looking for :

 session = awswrangler.Session()
    dataframe = session.pandas.read_sql_athena(
        sql="select * from tables,
        database="database",
        workgroup="workgroup_w/o_overriding_client_setting"
    )

Pandas SettingWithCopyWarning

~/.virtualenvs/revisions/lib/python3.7/site-packages/awswrangler/pandas.py:896: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dataframe[name] = dataframe[name].astype("float64")

I keep getting a ton of these popping out of my runs. I'm running:

  • pandas==0.25.3
  • awswrangler==0.0.19

unable to cast partition column final Athena type

Hello! I have need to cast the final Athena type of a partition column to date, however dropping the partition columns from the dataframe subgroup occurs before cast_columns is applied (here: https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L762). This has 2 effects:

  1. I get a ValueError: '<partition column name>' is not in list if cast_columns includes the partition key(s). The line that throws the exception is https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L892

  2. If I omit the partition column from cast_columns things work, but then I am unable to get the final desired Athena type to the bit that writes the glue table (https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L674).

It would be great if the library could handle dropping the partition column names from cast_columns when casting the partition subgroups while ensuring the partition column types make it to the self._session.glue.metadata_to_glue call.

(Side note: trying a naive patch locally and trying it out leads to issue #59 about date type formatting in partition keys)

Thanks!

Document on how to use lambda layer

Can you please document on how to use the lambda layer instead of just refering to that it exists. I do not immediately know how to use it. (Refering to: P.S. Lambda Layer's bundle and Glue's wheel/egg are available to download. Just upload it and run! 🚀)

Thanks a lot

Pyarrow schema registration in Glue

Issue is similar to #29 but for Pyarrow.
Pyarrow supports richer types than pandas, in our case ListArray, which translates to array<int> in Glue.
The current implementation requires to go through pandas, which stores it in an object column which then gets added as string to the schema.
Looking at the code, it looks like we reconstruct the Pyarrow schema anyway, and it might be as simple as expose this entry point as well as pandas.

Request for glue.get_table_pandas_types

Is there a quick way to get metadata from glue catalog and then pass into pandas.read_csv()?
Specifically for files without metadata/headers

Scenario:
metadata is created into glue catalog first as data file do not have header. The job get metadata from glue catalog then serialize input file in pandas.read_csv() before writing to parquet.
However, partition column affect column position created in glue catalog (position and the end)

In traditional ETL context

  1. Define file structure in data store
  2. Use the defined file template as source in data flow
  3. Parameterize file name for each run (for source file with running date as name)
  4. Update file structure when there is update

In Glue Python:

  1. Define 'table' structure in Glue Catalog (no data underlying, not queryable)
  2. Based on file name grab table meta from glue catalog, pass into read_csv() for serialize for the headerless file.
  3. Transform the input file to parquet (with same structure), but output to a new database + table for data lake query.

By doing this, file structure changes will not need to modify python script.

  • Does this make sense? or it is better to use Glue Spark + Grok expression for this purpose? (multiple files but small in size)

EOF error when writing to S3

First, thanks for this awesome library! I was testing out some of the features yesterday and hit this error. The workflow was:

  • Read a geospatial dataset off S3 using geopandas
  • Create a geodataframe (a special case of a pandas dataframe)
  • Write to S3 using aws-data-wrangler like so:
session = awswrangler.Session()
session.pandas.to_parquet(
    dataframe=final_gdf.astype({'geometry': str}),
    database='foo',
    table='bar',
    path="s3://bucket/path/to/file/",
    partition_cols=['partition_col'],
    compression='gzip',
    mode='overwrite'
)

It looks like it writes many files but eventually I hit an EOFError error:

---------------------------------------------------------------------------
EOFError                                  Traceback (most recent call last)
<ipython-input-40-0f6d02d0cf6c> in <module>
      5     partition_cols=['STATEFP', 'COUNTYFP', 'TRACTCE'],
      6     compression='gzip',
----> 7     mode='overwrite'
      8 )

/usr/local/lib/python3.7/site-packages/awswrangler/pandas.py in to_parquet(self, dataframe, path, database, table, partition_cols, preserve_index, mode, compression, procs_cpu_bound, procs_io_bound, cast_columns, inplace)
    555                           procs_io_bound=procs_io_bound,
    556                           cast_columns=cast_columns,
--> 557                           inplace=inplace)
    558 
    559     def to_s3(self,

/usr/local/lib/python3.7/site-packages/awswrangler/pandas.py in to_s3(self, dataframe, path, file_format, database, table, partition_cols, preserve_index, mode, compression, procs_cpu_bound, procs_io_bound, cast_columns, extra_args, inplace)
    632                                         procs_io_bound=procs_io_bound,
    633                                         cast_columns=cast_columns,
--> 634                                         extra_args=extra_args)
    635         if database:
    636             self._session.glue.metadata_to_glue(dataframe=dataframe,

/usr/local/lib/python3.7/site-packages/awswrangler/pandas.py in data_to_s3(self, dataframe, path, file_format, partition_cols, preserve_index, mode, compression, procs_cpu_bound, procs_io_bound, cast_columns, extra_args)
    685                 receive_pipes.append(receive_pipe)
    686             for i in range(len(procs)):
--> 687                 objects_paths += receive_pipes[i].recv()
    688                 procs[i].join()
    689                 receive_pipes[i].close()

/usr/local/lib/python3.7/multiprocessing/connection.py in recv(self)
    248         self._check_closed()
    249         self._check_readable()
--> 250         buf = self._recv_bytes()
    251         return _ForkingPickler.loads(buf.getbuffer())
    252 

/usr/local/lib/python3.7/multiprocessing/connection.py in _recv_bytes(self, maxsize)
    405 
    406     def _recv_bytes(self, maxsize=None):
--> 407         buf = self._recv(4)
    408         size, = struct.unpack("!i", buf.getvalue())
    409         if maxsize is not None and size > maxsize:

/usr/local/lib/python3.7/multiprocessing/connection.py in _recv(self, size, read)
    381             if n == 0:
    382                 if remaining == size:
--> 383                     raise EOFError
    384                 else:
    385                     raise OSError("got end of file during message")

EOFError: 

Looks like the connection is getting reset somehow? I'll note that I'm testing this in a jupyter notebook running in a docker container.

Current configuration:

  • MacOS with a Docker container running Debian (stretch)
  • Python 3.7
  • awswrangler 0.0.12

Any ideas what's going on here?

support for np.uint64

data sets i deal with have a lot of unsigned 64 bit numbers and i'm struggling finding the best way for parquet not to mess them up on load/unload.

May be some issues within pyarrow for support?

utilize glue connections for db connections parameters

A nice utility function to enable building database connections from glue connection parameters ( by connection name )

today i'm using boto to lookup the connection by name and extract the JDBC JDBC_CONNECTION_URL and parse to build the connections.. something like the below..

def pg_glue_connection(name):

cli = boto3.client("glue")
conn_props = list(filter(lambda x: x["Name"] == name, cli.get_connections()["ConnectionList"]))[0]["ConnectionProperties"]

url = conn_props["JDBC_CONNECTION_URL"]
host = url.split("://")[1].split(":")[0]
port = url.split("://")[1].split(":")[1].split("/")[0]
dbname = url.split("://")[1].split(":")[1].split("/")[1]
user = conn_props["USERNAME"]
password = conn_props["PASSWORD"]

return pg8000.connect(
    database=dbname, host=host, port=int(port), user=user, password=password, ssl=True, application_name="glue"
)

suggest to add pandas.to_sql()

Is it possible to have ability similar to function below:

gluecontext.write_dynamic_frame.from_jdbc_conf() as below?
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Currently the way we do is:

  1. Get SQL from S3 file and pass into pandas.read_sql_athena()
  2. use SQLAlchemy to execute preactions SQL. For our case is delete before load
  3. use SQLAlchemy and pandas.to_sql() to append dataframe into aurora table

Add parameter to specify file compression for csv files

Add file compression param while writing csv/parquet. For parquet, it compression defaults to snappy in parquet.py def write_table but in function to_csv, there is no param to specify file compression
ie. pandas.py
def to_csv(
self,
dataframe,
path,
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
procs_cpu_bound=None,
procs_io_bound=None,
)

Request is to add a new parameter for specifying the file compression eg - gzip

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html
compression: str, default ‘infer’
Compression mode among the following possible values: {‘infer’, ‘gzip’, ‘bz2’, ‘zip’, ‘xz’, None}. If ‘infer’ and path_or_buf is path-like, then detect compression from the following extensions: ‘.gz’, ‘.bz2’, ‘.zip’ or ‘.xz’. (otherwise no compression).

Unsupported Athena type: timestamp with time zone

Hello! I'm querying Athena for a value with type timestamp with time zone, which gives the following error:

---------------------------------------------------------------------------
UnsupportedType                           Traceback (most recent call last)
<ipython-input-3-34b2c4caaef6> in <module>
      1 df = session.pandas.read_sql_athena(
      2     sql="select current_timestamp as value, typeof(current_timestamp) type",
----> 3     database='default',
      4 )

~/anaconda3/lib/python3.7/site-packages/awswrangler/pandas.py in read_sql_athena(self, sql, database, s3_output, max_result_size)
    427         else:
    428             dtype, parse_timestamps, parse_dates = self._session.athena.get_query_dtype(
--> 429                 query_execution_id=query_execution_id)
    430             path = f"{s3_output}{query_execution_id}.csv"
    431             ret = self.read_csv(path=path,

~/anaconda3/lib/python3.7/site-packages/awswrangler/athena.py in get_query_dtype(self, query_execution_id)
     46         parse_dates = []
     47         for col_name, col_type in cols_metadata.items():
---> 48             ptype = Athena._type_athena2pandas(dtype=col_type)
     49             if ptype in ["datetime64", "date"]:
     50                 parse_timestamps.append(col_name)

~/anaconda3/lib/python3.7/site-packages/awswrangler/athena.py in _type_athena2pandas(dtype)
     37             return "date"
     38         else:
---> 39             raise UnsupportedType(f"Unsupported Athena type: {dtype}")
     40 
     41     def get_query_dtype(self, query_execution_id):

UnsupportedType: Unsupported Athena type: timestamp with time zone

You should be able to reproduce with the following:

import awswrangler

session = awswrangler.Session()

df = session.pandas.read_sql_athena(
    sql="select current_timestamp as value, typeof(current_timestamp) as type",
    database='default',
)

Thanks for the great package!

Execution never ends with a bad Redshift connection

I experienced this issue testing aws-data-wrangler in the "Spark to Redshift" use case.

I created a Redshift connection with the method Redshift.generate_connection and when I tried to load the dataframe to Redshift with the method session.spark.to_redshift the executions never finished so I had to manually cancel the execution. The temporary files on S3 were created, but nothing on Redshift. Later I realize that I was passing the port as string to the connection but I think that the method expects an int argument, when I changed this the load worked just fine!

My guess is that with a wrong connection setup wrangler can't reach Redshift but there is no timeout to stop the execution, so the code gets stuck.

Decimal DataType in Athena is not Supported

Athena has a table with datatype decimal (13,4)
Out of the box aws wrangler library is not supporting this.
raise UnsupportedType(f"Unsupported Athena type: {dtype}")
awswrangler.exceptions.UnsupportedType: Unsupported Athena type: decimal

Unexpected Behavior in read_csv() with max_result_size

I'm using the read_csv() with max_result_size of 128mb to process a 135mb csv file. When attempt to write parquet, it return the following error:

Traceback (most recent call last): File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/home/root/environment/demo/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 815, in _data_to_s3_dataset_writer_remote isolated_dataframe=True)) File "/home/root/environment/demo/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 758, in _data_to_s3_dataset_writer isolated_dataframe=isolated_dataframe) File "/home/root/environment/demo/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 856, in _data_to_s3_object_writer isolated_dataframe=isolated_dataframe) File "/home/root/environment/demo/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 902, in write_parquet_dataframe table = pa.Table.from_pandas(df=dataframe, preserve_index=preserve_index, safe=False) File "pyarrow/table.pxi", line 1174, in pyarrow.lib.Table.from_pandas File "/home/root/environment/demo/lib64/python3.6/dist-packages/pyarrow/pandas_compat.py", line 501, in dataframe_to_arrays convert_fields)) File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in result_iterator yield fs.pop().result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result return self.__get_result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run result = self.fn(*self.args, **self.kwargs) File "/home/root/environment/demo/lib64/python3.6/dist-packages/pyarrow/pandas_compat.py", line 487, in convert_column raise e File "/home/root/environment/demo/lib64/python3.6/dist-packages/pyarrow/pandas_compat.py", line 481, in convert_column result = pa.array(col, type=type_, from_pandas=True, safe=safe) File "pyarrow/array.pxi", line 191, in pyarrow.lib.array File "pyarrow/array.pxi", line 78, in pyarrow.lib._ndarray_to_array File "pyarrow/error.pxi", line 95, in pyarrow.lib.check_status pyarrow.lib.ArrowTypeError: ("Expected a bytes object, got a 'Timestamp' object", 'Conversion failed for column xxxxx with type object')

When process the same file using read_csv() without max_result_size, it work fine.

Code Snippet:

df_iter = session.pandas.read_csv(csv, sep='|', header=0, dtype=file_meta, parse_dates=dates_cols, max_result_size=12810241024)

for df in df_iter:
df_data=df_data.append(df)

session.pandas.to_parquet(dataframe=df_data,database=db_name,table=tbl_name,preserve_index=False,mode='overwrite', path=path)

Cache read_sql_athena

Read the athena query result from s3 instead of re-running the query again for identical queries

PySpark schema registration in Glue

With Pandas, it's possible to write a parquet dataframe to s3 and have it registered in Glue so it can be queried by Athena.

It would be nice to support the same feature for PySpark.
PySpark provide richer types (Array, Map, ...) which are mostly supported in Athena and Glue but not in Pandas. So converting PySpark DF to pandas and writing it with awswrangler will corrupt the nested types (arrays,...) and will not work.

It would be nice to be able to just register a Pyspark Dataframe Schema in Glue or moreover a partitionned Pyspark Dataframe partitions as Glue partitions.

Unsupported Athena type: row

I am getting this error when trying to run a simple query against AWS Athena

Exception has occurred: UnsupportedType
Unsupported Athena type: row

import awswrangler 

database_name = "_results_database"
wrangler = awswrangler.Session(athena_ctas_approach=False)
dataframe = wrangler.pandas.read_sql_athena(
    sql='SELECT * FROM request_response',
    database=database_name
)
print('done')

If I try to limit the data to a single row in order to see if it has to do with a single partition by chance I get a different error, which may be related:

Exception has occurred: ArrowNotImplementedError
Reading lists of structs from Parquet files not yet supported: group: list<array_element: struct<header: struct<e_type: string, id2: string, p_id: string, r_id: string>, request: string, response: struct<appreturncode: int32, apperrors: string, resp_coll: string, rec_prpr: string>>>

import awswrangler 

database_name = "{env}_facets_results_database".format(env="dev")
wrangler = awswrangler.Session(athena_ctas_approach=True)
dataframe = wrangler.pandas.read_sql_athena(
sql="SELECT * " +
" FROM _request_response AS fac " +
" WHERE \"fac\".\"id\" = '1000009'",
    database=database_name
)
print('done')

Maybe there isn't anything you can do though after reading this issue apache/arrow#3371.

0.0b12 incorrectly published?

I upgraded to 0.0b12 however I do not see some functionality that is tagged as such in the repo.

In tag 0.0.12 I see that read_sql_athena has a new arg max_result_size:
https://github.com/awslabs/aws-data-wrangler/blob/e7c8a2c99e1ca3eb26e434e3c6e8f6910e898200/awswrangler/pandas.py#L419

However, when I pull down 0.0b12, I do not see the arg.

$ grep wrang requirements.txt 
awswrangler==0.0b12

$ ls venv/lib/python3.6/site-packages/awswrangler*
venv/lib/python3.6/site-packages/awswrangler
venv/lib/python3.6/site-packages/awswrangler-0.0b12-py3.6.egg-info

$ grep read_sql_athena venv/lib/python3.6/site-packages/awswrangler/pandas.py 
    def read_sql_athena(self, sql, database, s3_output=None):

Maybe I'm missing something, but I would expect the code tagged with 0.0.12 to be in the artifact versioned as 0.0b12.

Thanks for this awesome library BTW.

Athena read issues (blank parquet cols; csv delimiters)

Hi again, making progress... I'm successfully using awswrangler to write Glue tables along with associated parquet or csv files in S3 now. However, having issues reading this data back with Athena.

I like parquet and haven't had problems with delimiter characters, encoding etc. before beyond having to explicitly set a few columns dtypes using pandas df.astype({'col name':'str'}) before writing to avoid mixed-type columns erroring out.

When I download and open one of the awswrangler-produced parquet files and open in pandas on my local machine, everything looks fine. However when I query the Glue table using Athena I get all the headers and rows, but the majority (not all) of the column data appears blank. Any idea what's going on here - something with datatypes or encoding, perhaps? It is reading from the exact same parquet files...

To work around, I also tried session.pandas.to_csv. When I download one of these csv files and to my local machine, I noticed fields with commas in the content are enclosed in quotes (normally I use pipe delimiters but don't see an option). Athena doesn't handle this well (and from what I have read, this kind of thing is a general pain with Athena/Presto). Do you plan to implement a "sep" parameter for the session.pandas.to_csv() function, or do you have any suggestion for what I might specify in Athena to match the csv formatting currently output by awswrangler?

Thanks again!

pandas.read_sql_athena() throw ZeroDivisionError

The function throw ZeroDivisionError: division by zero when there is no row returns.

Details:
File "/home/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 547, in read_sql_athena procs_cpu_bound=procs_cpu_bound) File "/home/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 580, in _read_sql_athena_ctas return self.read_parquet(path=paths, procs_cpu_bound=procs_cpu_bound) File "/home/lib64/python3.6/dist-packages/awswrangler/pandas.py", line 1254, in read_parquet bounders = calculate_bounders(len(path), procs_cpu_bound) File "/home/lib64/python3.6/dist-packages/awswrangler/utils.py", line 16, in calculate_bounders size = int(num_items / num_groups)

UnsupportedType: Unsupported Redshift type: d

Hello!

I'm getting this error when trying to write a dataframe to Redshift while passing cast_columns. I believe the source of the issue is related to this line https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L893 passing a dict to data_types.convert_schema, which then iterates like a list of tuples instead of dict key value pairs here https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/data_types.py#L294

Here's a trivial example:

import awswrangler
import pandas as pd

df = pd.DataFrame({"id": [1, 2, 3], "name": ["name1", "name2", "name3"]})

session = awswrangler.Session()

schema = {
    "id": "BIGINT",
    "name": "VARCHAR"
}

session.pandas.to_redshift(
    df,
    's3://example-bucket/',
    None,
    "public",
    "example",
    "arn:aws:iam::123456789:role/rolename",
    cast_columns=schema,
)

Thanks for the great library!

error writing parquet dataframe with only 1 row

Hello! Thanks again for your quick fixes for my last couple of issues!

I'm having an odd problem where writing out a parquet dataframe fails if there's only 1 row.

In the below example, when one partition only has 1 row I get the error ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column col1 with type Int64').

If you uncomment the additional row in the example below so that each partition has > 1 row the dataframe writes out fine.

I believe the issue is with this line https://github.com/awslabs/aws-data-wrangler/blob/master/awswrangler/pandas.py#L896 . For some reason slice notation with a single row isn't actually assigning the new float64 casted dataframe. I'm not entirely sure why that would be.

There also seems to be an interaction with procs_cpu_bound, when it's > 1 the 4 row partitioned dataframe throws the arrow type error when it didn't with procs_cpu_bound=1.

Let me know if I can provide any more info!

Thanks!

import pandas as pd
import awswrangler

session = awswrangler.Session(procs_cpu_bound=1)

# create some dummy data
data = {
    'col1': [
        1,
        2,
        3,
        # 4,
    ],
    # date column to see valid dates
    'datecol': [
        '2019-11-09',
        '2019-11-09',
        '2019-11-08',
        # '2019-11-08',
    ],
    # dates in partition column to see invalid date strings
    'partcol': [
        '2019-11-09',
        '2019-11-09',
        '2019-11-08',
        # '2019-11-08',
    ]
}

df = pd.DataFrame(data)
# cast date columns to pandas datetime, similar to the result of read_csv() with parse_dates
df = df.astype({'datecol': 'datetime64', 'partcol': 'datetime64'})

# tell Athena what types you want
schema = {
    'col1': 'bigint',
    'datecol': 'date',
    'partcol': 'date',
}

session.pandas.to_parquet(
    dataframe=df,
    database="default",
    path="s3://...,
    partition_cols=['datecol'],
    table="r...",
    mode="overwrite_partitions",
    cast_columns=schema
)

reading csv and writing parquet in chunks in overwrite mode

Hello! I have some larger CSV's that I need to read from S3 in chunks using max_results_size and write each chunk to parquet. I can't fit the entire pandas df in memory, so I was hoping to loop over the iterator of dataframes and write each chunk. However, every flow to write to S3 in overwrite mode seems to include deleting any file not associated with that specific operation chain, so it ends up deleting parquet files from previously written chunks. Do you have any suggestions on how I could accomplish this?

Thanks!

Unable to write from csv to parquet + register glue after 0.0.2

i am having issue when writing data consists of null to parquet.
Following error message is returned:

f"Unsupported Pyarrow type for column {name}: {dtype}")
awswrangler.exceptions.UnsupportedType: Unsupported Pyarrow type for column xxxxxxx: null

The same code work fine in version 0.0.2.

Column with Boolean type and null values

I am trying to use aws wrangler library to query glue data catalog.
I have trouble with Boolean Type Columns with Null Values in it.
I am issuing a sql statement and loading the data into a dataframe.

df = session.pandas.read_sql_athena(sql=query,database=dname)

ValueError: Bool column has NA values in column 7.

I am also including the log for this.
Can you please suggest if there is a way to get around this?

I downloaded the results of the query from athena as a csv and tried loading it into a dataframe and it worked without any issues.

File "/usr/local/lib/python3.6/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/usr/local/lib/python3.6/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/tmp/glue-python-scripts-vd0482dz/SplittingDataFrame.py", line 175, in
File "/glue/lib/installation/awswrangler/pandas.py", line 447, in read_sql_athena
max_result_size=max_result_size)
File "/glue/lib/installation/awswrangler/pandas.py", line 133, in read_csv
converters=converters)
File "/glue/lib/installation/awswrangler/pandas.py", line 414, in _read_csv_once
converters=converters,
File "/glue/lib/installation/pandas/io/parsers.py", line 685, in parser_f
return _read(filepath_or_buffer, kwds)
File "/glue/lib/installation/pandas/io/parsers.py", line 463, in _read
data = parser.read(nrows)
File "/glue/lib/installation/pandas/io/parsers.py", line 1154, in read
ret = self._engine.read(nrows)
File "/glue/lib/installation/pandas/io/parsers.py", line 2059, in read
data = self._reader.read(nrows)
File "pandas/_libs/parsers.pyx", line 881, in pandas._libs.parsers.TextReader.read
File "pandas/_libs/parsers.pyx", line 896, in pandas._libs.parsers.TextReader._read_low_memory
File "pandas/_libs/parsers.pyx", line 973, in pandas._libs.parsers.TextReader._read_rows
File "pandas/_libs/parsers.pyx", line 1105, in pandas._libs.parsers.TextReader._convert_column_data
File "pandas/_libs/parsers.pyx", line 1136, in pandas._libs.parsers.TextReader._convert_tokens
File "pandas/_libs/parsers.pyx", line 1257, in pandas._libs.parsers.TextReader._convert_with_dtype
ValueError: Bool column has NA values in column 7

Support for Aurora Postgresql

Hi,
I just lovely utility, thanks. Does this utility supports Aurora PostgreSQL just similar to what we have for Redshift? If not, can we have plan to implement in future?
Thanks

Dates get registered as string in Glue but correct in Parquet

Consider the following Pandas Dataframe:

import datetime
df['datetime'] = datetime.datetime.today()
df['normalized_date'] = df['datetime'].dt.normalize()
df['date'] = datetime.date.today()
df = df[['datetime', 'normalized_date', 'date']]

This results in a schema like this:

datetime                 datetime64[ns]
normalized_date    datetime64[ns]
date                        object

Resulting in this schema in Glue when saved:

Picture 5

While the schema in Parquet looks like this when reading the file with Spark:

root
 |-- datetime: timestamp (nullable = true)
 |-- normalized_date: timestamp (nullable = true)
 |-- date: date (nullable = true)

This issue might be caused in the Glue.type_pandas2athena() function that converts
all pandas 'object'-types to string. Maybe instead of using the Pandas schema, you need to use the pyarrow.Table schema.

Error On Athena to Pandas (CTAS Approach)

Encountered error below when using the new approach:

awswrangler.exceptions.QueryFailed: NOT_SUPPORTED: Unsupported Hive type: timestamp with time zone. You may need to manually clean the data at location 's3://aws-athena-query-results/tables/<hash-name>' before retrying. Athena will not delete data in your account.

Not support Array_column

An error occurs when execute a query against a table containing array_column

session.pandas.read_sql_athena(
... sql="select * from xxxx limit 100",
... database="xxxx"
... )

Traceback (most recent call last):
File "", line 3, in
File "/usr/local/lib/python3.6/site-packages/awswrangler/pandas.py", line 477, in read_sql_athena
max_result_size=max_result_size)
File "/usr/local/lib/python3.6/site-packages/awswrangler/pandas.py", line 136, in read_csv
converters=converters)
File "/usr/local/lib/python3.6/site-packages/awswrangler/pandas.py", line 417, in _read_csv_once
converters=converters,
File "/usr/local/lib64/python3.6/site-packages/pandas/io/parsers.py", line 685, in parser_f
return _read(filepath_or_buffer, kwds)
File "/usr/local/lib64/python3.6/site-packages/pandas/io/parsers.py", line 463, in _read
data = parser.read(nrows)
File "/usr/local/lib64/python3.6/site-packages/pandas/io/parsers.py", line 1154, in read
ret = self._engine.read(nrows)
File "/usr/local/lib64/python3.6/site-packages/pandas/io/parsers.py", line 2059, in read
data = self._reader.read(nrows)
File "pandas/_libs/parsers.pyx", line 881, in pandas._libs.parsers.TextReader.read
File "pandas/_libs/parsers.pyx", line 896, in pandas._libs.parsers.TextReader._read_low_memory
File "pandas/_libs/parsers.pyx", line 973, in pandas._libs.parsers.TextReader._read_rows
File "pandas/_libs/parsers.pyx", line 1083, in pandas._libs.parsers.TextReader._convert_column_data
File "pandas/_libs/parsers.pyx", line 2231, in pandas._libs.parsers._apply_converter
File "/usr/lib64/python3.6/ast.py", line 48, in literal_eval
node_or_string = parse(node_or_string, mode='eval')
File "/usr/lib64/python3.6/ast.py", line 35, in parse
return compile(source, filename, mode, PyCF_ONLY_AST)
File "", line 1
[xxxxx]

Pandas.to_csv() output single file

Is it possible for Pandas.to_csv() to output single file and have option to put in file name?

The reason is that for RDS it don't have a COPY mechanism like redshift that able to take in multiple files.

relationship to pandasglue? and multithreading error in AWS Lambda

Hi there, I'm getting started with AWS preparing to implement Data Lake. Thanks for creating these packages! I currently have pandasglue stack installed in an AWS Lambda Layer with intent of lightweight ETL and I'm running some "hello world" type lambda functions to test. I'm wondering if the scope of aws-data-wrangler project covers that of pandasglue, or if there are different use cases for that? Maybe I should be using aws-data-wrangler instead? (I didn't see a link to post an issue on the pandasglue github page.)

So far, I was able to read-in an existing glue table to a pandas dataframe, but when trying to write I get an error that might be related to multithreading (s3.py calls mp.Pool). Here are a couple quotes from other posts I found when googling around:

Python's _multithreading requires /dev/shm to work.

AWS Lambda execution environment does not have /dev/shm (shared memory for processes) support.

My lambda function:

`

import pandas as pd
import pandasglue as pg

def lambda_handler(event, context):

    #Parameters
    database = "glue-test-db"
    table_name = "pg-test"
    s3_path = "s3://nonlake-test-bucket/pg-test"
    
    #Sample DF
    source_data = {'name': ['Sarah', 'Renata', 'Erika', 'Fernanda', 'Diana'], 
            'city': ['Seattle', 'Sao Paulo', 'Seattle', 'Santiago', 'Lima'],
             'test_score': [82, 52, 56, 234, 254]}
    
    df = pd.DataFrame(source_data, columns = ['name', 'city', 'test_score'])
    
    pg.write_glue(df, database, s3_path, table_name, partition_cols=['city'], mode="overwrite")

`

output:

[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 18, in lambda_handler
    pg.write_glue(df, database, s3_path, table_name, partition_cols=['city'], mode="overwrite")
  File "/opt/python/lib/python3.7/site-packages/pandasglue/init.py", line 25, in write_glue
    return write.write(**locals())
  File "/opt/python/lib/python3.7/site-packages/pandasglue/write.py", line 124, in write
    s3.delete_objects(path, session_primitives=session_primitives)
  File "/opt/python/lib/python3.7/site-packages/pandasglue/services/s3.py", line 22, in delete_objects
    pool = mp.Pool(mp.cpu_count())
  File "/var/lang/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/var/lang/lib/python3.7/multiprocessing/pool.py", line 158, in init
    self._setup_queues()
  File "/var/lang/lib/python3.7/multiprocessing/pool.py", line 251, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/var/lang/lib/python3.7/multiprocessing/context.py", line 112, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/var/lang/lib/python3.7/multiprocessing/queues.py", line 332, in init
    self._rlock = ctx.Lock()
  File "/var/lang/lib/python3.7/multiprocessing/context.py", line 67, in Lock
    return Lock(ctx=self.get_context())
  File "/var/lang/lib/python3.7/multiprocessing/synchronize.py", line 162, in init
    SemLock.init(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.7/multiprocessing/synchronize.py", line 59, in init
    unlink_now)

parquet file extension

I have all aws glue jobs creating parquet files with .snappy.parquet extension.
To match with the existing files to a common standard, Requesting to change the function pandas.to_parquet save the files with same extension.
I see its saving parquet files when saving pandas df to parquet file '.parquet.snappy' extension instead of .snappy.parquet.
example :
sess.pandas.to_parquet(
dataframe=csvdf,
path=output_file_path,
mode= Mode, # :param mode: "append", "overwrite", "overwrite_partitions"
compression="snappy"
,partition_cols= [file_load_date,file_name]

Thanks,

Support parquet logical type of MAP

Looking to get support for outputting Tuple[str, str] or non nested dictionary's (key value pairs ) outputted logical dt of MAP in parquet file.

I understand dependent libraries do not yet support it by looking at this pull apache/arrow#5774. Looks like it might be wrapping up development soon, does it make sense to eventually support?

Thoughts?

s3fs and server side encryption

Hi again, I'm hoping you can point me in the right direction on something. My company IT has applied some kind of global IAM policy where you have to specify a KMS key to upload anything, even if the bucket encrypts everything going in with the same key by default. In Lambda, I can use boto3 to upload like this:

ExtraArgs={'ServerSideEncryption':'aws:kms','SSEKMSKeyId':'arn:aws:kms:[region-keystring]'}

boto3.Session().resource('s3').Bucket([bucket name]).Object([prefix]).upload_file([file here], ExtraArgs=ExtraArgs) 

Since awswrangler uses s3fs under the hood, I have been trying to figure out if there is a way to pass the KMS ExtraArgs data as above either through awswrangler session initiation or when calling a method that includes writes to s3 like session.pandas.to_parquet()? These are erroring out when I try to use them and I suspect that is the reason.

I get "Invalid KeyID" in the log output from calling session.pandas.to_parquet() with valid pandas dataframe:

t/python/s3fs/core.py", line 952, in _call_s3
    **kwargs)
  File "/opt/python/s3fs/core.py", line 182, in _call_s3
    return method(**additional_kwargs)
  File "/opt/python/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/python/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (KMS.NotFoundException) when calling the CreateMultipartUpload operation: Invalid keyId [name of KMS key]
...

I've also tried doing basic writes from Lambda to S3 just using pure s3fs like this and still get Access Denied error.

s3 = s3fs.S3FileSystem(s3_additional_kwargs=ExtraArgs)
with s3.open(['same bucket and prefix as above/f.txt', 'wb') as f:
    f.write('lalala')

output:

{
  "errorMessage": "Access Denied",
  "errorType": "PermissionError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 17, in lambda_handler\n    f.write('lalala')\n",
    "  File \"/opt/python/fsspec/spec.py\", line 1157, in __exit__\n    self.close()\n",
    "  File \"/opt/python/fsspec/spec.py\", line 1123, in close\n    self.flush(force=True)\n",
    "  File \"/opt/python/fsspec/spec.py\", line 995, in flush\n    self._initiate_upload()\n",
    "  File \"/opt/python/s3fs/core.py\", line 966, in _initiate_upload\n    raise translate_boto_error(e)\n"
  ]
}

Any ideas appreciated - thanks!

NaT value for date column is being saved as "0001-01-01"

df = pd.DataFrame({
    "col1": ["val1", "val2"],
    "datecol": [datetime.datetime.now().date(), pd.NaT],
    'partcol': ["2019-11-09", "2019-11-08"]
})

session.pandas.to_parquet(dataframe=df,
                          database="database_name",
                          table="table_name",
                          path="s3://...",
                          mode="overwrite_partitions",
                          preserve_index=False,
                          procs_cpu_bound=1,
                          partition_cols=["partcol"],
                          cast_columns={"datecol": "date"})

If I query this data on Athena, is returning "0001-01-01" instead of null

EMR File System (EMRFS) Integration

Add synchronization feature when EMRFS metastore enabled, e.g. having parquet files created by EMR and trying deletion from S3 with AWS Lambda (or local boto app).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.