Coder Social home page Coder Social logo

azure-data-lake-store-python's Introduction

azure-datalake-store

A pure-python interface to the Azure Data-lake Storage Gen 1 system, providing pythonic file-system and file objects, seamless transition between Windows and POSIX remote paths, high-performance up- and down-loader.

This software is under active development and not yet recommended for general use.

Note: This library supports ADLS Gen 1. For Gen 2, please see azure-storage-file-datalake, documented here

Installation

Using pip:

pip install azure-datalake-store

Manually (bleeding edge):

Auth

Although users can generate and supply their own tokens to the base file-system class, and there is a password-based function in the lib module for generating tokens, the most convenient way to supply credentials is via environment parameters. This latter method is the one used by default in library. The following variables are required:

  • azure_tenant_id

  • azure_username

  • azure_password

  • azure_store_name

  • azure_url_suffix (optional)

Pythonic Filesystem

The AzureDLFileSystem object is the main API for library usage of this package. It provides typical file-system operations on the remote azure store

token = lib.auth(tenant_id, username, password)
adl = core.AzureDLFileSystem(store_name, token)
# alternatively, adl = core.AzureDLFileSystem()
# uses environment variables

print(adl.ls())  # list files in the root directory
for item in adl.ls(detail=True):
    print(item)  # same, but with file details as dictionaries
print(adl.walk(''))  # list all files at any directory depth
print('Usage:', adl.du('', deep=True, total=True))  # total bytes usage
adl.mkdir('newdir')  # create directory
adl.touch('newdir/newfile') # create empty file
adl.put('remotefile', '/home/myuser/localfile') # upload a local file

In addition, the file-system generates file objects that are compatible with the python file interface, ensuring compatibility with libraries that work on python files. The recommended way to use this is with a context manager (otherwise, be sure to call close() on the file object).

with adl.open('newfile', 'wb') as f:
    f.write(b'index,a,b\n')
    f.tell()   # now at position 9
    f.flush()  # forces data upstream
    f.write(b'0,1,True')

with adl.open('newfile', 'rb') as f:
    print(f.readlines())

with adl.open('newfile', 'rb') as f:
    df = pd.read_csv(f) # read into pandas.

To seamlessly handle remote path representations across all supported platforms, the main API will take in numerous path types: string, Path/PurePath, and AzureDLPath. On Windows in particular, you can pass in paths separated by either forward slashes or backslashes.

import pathlib  # only >= Python 3.4
from pathlib2 import pathlib  # only <= Python 3.3

from azure.datalake.store.core import AzureDLPath

# possible remote paths to use on API
p1 = '\\foo\\bar'
p2 = '/foo/bar'
p3 = pathlib.PurePath('\\foo\\bar')
p4 = pathlib.PureWindowsPath('\\foo\\bar')
p5 = pathlib.PurePath('/foo/bar')
p6 = AzureDLPath('\\foo\\bar')
p7 = AzureDLPath('/foo/bar')

# p1, p3, and p6 only work on Windows
for p in [p1, p2, p3, p4, p5, p6, p7]:
  with adl.open(p, 'rb') as f:
      print(f.readlines())

Performant up-/down-loading

Classes ADLUploader and ADLDownloader will chunk large files and send many files to/from azure using multiple threads. A whole directory tree can be transferred, files matching a specific glob-pattern or any particular file.

# download the whole directory structure using 5 threads, 16MB chunks
ADLDownloader(adl, '', 'my_temp_dir', 5, 2**24)

API

class azure.datalake.store.core.AzureDLFileSystem(token=None, per_call_timeout_seconds=60, **kwargs)

Access Azure DataLake Store as if it were a file-system

  • Parameters

    store_name: str (“”)

      Store name to connect to.
    

    token: credentials object

      When setting up a new connection, this contains the authorization
      credentials (see lib.auth()).
    

    url_suffix: str (None)

      Domain to send REST requests to. The end-point URL is constructed
      using this and the store_name. If None, use default.
    

    api_version: str (2018-09-01)

      The API version to target with requests. Changing this value will
      change the behavior of the requests, and can cause unexpected behavior or
      breaking changes. Changes to this value should be undergone with caution.
    

    per_call_timeout_seconds: float(60)

      This is the timeout for each requests library call.
    

    kwargs: optional key/values

      See `lib.auth()`; full list: tenant_id, username, password, client_id,
      client_secret, resource
    

Methods

access(self, path, invalidate_cache=True)

Does such a file/directory exist?

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    invalidate_cache: bool

      Whether to invalidate cache
    
  • Returns

    True or false depending on whether the path exists.

cat(self, path)

Return contents of file

  • Parameters

    path: str or AzureDLPath

      Path to query
    
  • Returns

    Contents of file

chmod(self, path, mod)

Change access mode of path

Note this is not recursive.

  • Parameters

    path: str

      Location to change
    

    mod: str

      Octal representation of access, e.g., “0777” for public read/write.
      See [docs]([http://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Permission](http://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Permission))
    

chown(self, path, owner=None, group=None)

Change owner and/or owning group

Note this is not recursive.

  • Parameters

    path: str

      Location to change
    

    owner: str

      UUID of owning entity
    

    group: str

      UUID of group
    

concat(self, outfile, filelist, delete_source=False)

Concatenate a list of files into one new file

  • Parameters

    outfile: path

      The file which will be concatenated to. If it already exists,
      the extra pieces will be appended.
    

    filelist: list of paths

      Existing adl files to concatenate, in order
    

    delete_source: bool (False)

      If True, assume that the paths to concatenate exist alone in a
      directory, and delete that whole directory when done.
    
  • Returns

    None

connect(self)

Establish connection object.

cp(self, path1, path2)

Not implemented. Copy file between locations on ADL

classmethod current()

Return the most recently created AzureDLFileSystem

df(self, path)

Resource summary of path

  • Parameters

    path: str

      Path to query
    

du(self, path, total=False, deep=False, invalidate_cache=True)

Bytes in keys at path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    total: bool

      Return the sum on list
    

    deep: bool

      Recursively enumerate or just use files under current dir
    

    invalidate_cache: bool

      Whether to invalidate cache
    
  • Returns

    List of dict of name:size pairs or total size.

exists(self, path, invalidate_cache=True)

Does such a file/directory exist?

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    invalidate_cache: bool

      Whether to invalidate cache
    
  • Returns

    True or false depending on whether the path exists.

get(self, path, filename)

Stream data from file at path to local filename

  • Parameters

    path: str or AzureDLPath

      ADL Path to read
    

    filename: str or Path

      Local file path to write to
    
  • Returns

    None

get_acl_status(self, path)

Gets Access Control List (ACL) entries for the specified file or directory.

  • Parameters

    path: str

      Location to get the ACL.
    

glob(self, path, details=False, invalidate_cache=True)

Find files (not directories) by glob-matching.

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    details: bool

      Whether to include file details
    

    invalidate_cache: bool

      Whether to invalidate cache
    
  • Returns

    List of files

head(self, path, size=1024)

Return first bytes of file

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    size: int

      How many bytes to return
    
  • Returns

    First(size) bytes of file

info(self, path, invalidate_cache=True, expected_error_code=None)

File information for path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    invalidate_cache: bool

      Whether to invalidate cache or not
    

    expected_error_code: int

      Optionally indicates a specific, expected error code, if any.
    
  • Returns

    File information

invalidate_cache(self, path=None)

Remove entry from object file-cache

  • Parameters

    path: str or AzureDLPath

      Remove the path from object file-cache
    
  • Returns

    None

listdir(self, path='', detail=False, invalidate_cache=True)

List all elements under directory specified with path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    detail: bool

      Detailed info or not.
    

    invalidate_cache: bool

      Whether to invalidate cache or not
    
  • Returns

    List of elements under directory specified with path

ls(self, path='', detail=False, invalidate_cache=True)

List all elements under directory specified with path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    detail: bool

      Detailed info or not.
    

    invalidate_cache: bool

      Whether to invalidate cache or not
    
  • Returns

    List of elements under directory specified with path

merge(self, outfile, filelist, delete_source=False)

Concatenate a list of files into one new file

  • Parameters

    outfile: path

      The file which will be concatenated to. If it already exists,
      the extra pieces will be appended.
    

    filelist: list of paths

      Existing adl files to concatenate, in order
    

    delete_source: bool (False)

      If True, assume that the paths to concatenate exist alone in a
      directory, and delete that whole directory when done.
    
  • Returns

    None

mkdir(self, path)

Make new directory

  • Parameters

    path: str or AzureDLPath

      Path to create directory
    
  • Returns

    None

modify_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None)

Modify existing Access Control List (ACL) entries on a file or folder. If the entry does not exist it is added, otherwise it is updated based on the spec passed in. No entries are removed by this process (unlike set_acl).

Note: this is by default not recursive, and applies only to the file or folder specified.

  • Parameters

    path: str

      Location to set the ACL entries on.
    

    acl_spec: str

      The ACL specification to use in modifying the ACL at the path in the format
      ‘[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,…’
    

    recursive: bool

      Specifies whether to modify ACLs recursively or not
    

mv(self, path1, path2)

Move file between locations on ADL

  • Parameters

    path1:

      Source Path
    

    path2:

      Destination path
    
  • Returns

    None

open(self, path, mode='rb', blocksize=33554432, delimiter=None)

Open a file for reading or writing

  • Parameters

    path: string

      Path of file on ADL
    

    mode: string

      One of ‘rb’, ‘ab’ or ‘wb’
    

    blocksize: int

      Size of data-node blocks if reading
    

    delimiter: byte(s) or None

      For writing delimiter-ended blocks
    

put(self, filename, path, delimiter=None)

Stream data from local filename to file at path

  • Parameters

    filename: str or Path

      Local file path to read from
    

    path: str or AzureDLPath

      ADL Path to write to
    

    delimiter:

      Optional delimeter for delimiter-ended blocks
    
  • Returns

    None

read_block(self, fn, offset, length, delimiter=None)

Read a block of bytes from an ADL file

Starting at offset of the file, read length bytes. If delimiter is set then we ensure that the read starts and stops at delimiter boundaries that follow the locations offset and `offset

  • length. If offset` is zero then we start at zero. The bytestring returned WILL include the end delimiter string.

If offset+length is beyond the eof, reads to eof.

  • Parameters

    fn: string

      Path to filename on ADL
    

    offset: int

      Byte offset to start read
    

    length: int

      Number of bytes to read
    

    delimiter: bytes (optional)

      Ensure reading starts and stops at delimiter bytestring
    

Examples

>>> adl.read_block('data/file.csv', 0, 13)  # doctest: +SKIP
b'Alice, 100\nBo'
>>> adl.read_block('data/file.csv', 0, 13, delimiter=b'\n')  # doctest: +SKIP
b'Alice, 100\nBob, 200\n'

Use length=None to read to the end of the file.

adl.read_block(‘data/file.csv’, 0, None, delimiter=b’n’) # doctest: +SKIP b’Alice, 100nBob, 200nCharlie, 300’

remove(self, path, recursive=False)

Remove a file or directory

  • Parameters

    path: str or AzureDLPath

      The location to remove.
    

    recursive: bool (True)

      Whether to remove also all entries below, i.e., which are returned
      by walk().
    
  • Returns

    None

remove_acl(self, path)

Remove the entire, non default, ACL from the file or folder, including unnamed entries. Default entries cannot be removed this way, please use remove_default_acl for that.

Note: this is not recursive, and applies only to the file or folder specified.

  • Parameters

    path: str

      Location to remove the ACL.
    

remove_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None)

Remove existing, named, Access Control List (ACL) entries on a file or folder. If the entry does not exist already it is ignored. Default entries cannot be removed this way, please use remove_default_acl for that. Unnamed entries cannot be removed in this way, please use remove_acl for that.

Note: this is by default not recursive, and applies only to the file or folder specified.

  • Parameters

    path: str

      Location to remove the ACL entries.
    

    acl_spec: str

      The ACL specification to remove from the ACL at the path in the format (note that the permission portion is missing)
      ‘[default:]user|group|other:[entity id or UPN],[default:]user|group|other:[entity id or UPN],…’
    

    recursive: bool

      Specifies whether to remove ACLs recursively or not
    

remove_default_acl(self, path)

Remove the entire default ACL from the folder. Default entries do not exist on files, if a file is specified, this operation does nothing.

Note: this is not recursive, and applies only to the folder specified.

  • Parameters

    path: str

      Location to set the ACL on.
    

rename(self, path1, path2)

Move file between locations on ADL

  • Parameters

    path1:

      Source Path
    

    path2:

      Destination path
    
  • Returns

    None

rm(self, path, recursive=False)

Remove a file or directory

  • Parameters

    path: str or AzureDLPath

      The location to remove.
    

    recursive: bool (True)

      Whether to remove also all entries below, i.e., which are returned
      by walk().
    
  • Returns

    None

rmdir(self, path)

Remove empty directory

  • Parameters

    path: str or AzureDLPath

      Directory  path to remove
    
  • Returns

    None

set_acl(self, path, acl_spec, recursive=False, number_of_sub_process=None)

Set the Access Control List (ACL) for a file or folder.

Note: this is by default not recursive, and applies only to the file or folder specified.

  • Parameters

    path: str

      Location to set the ACL on.
    

    acl_spec: str

      The ACL specification to set on the path in the format
      ‘[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,…’
    

    recursive: bool

      Specifies whether to set ACLs recursively or not
    

set_expiry(self, path, expiry_option, expire_time=None)

Set or remove the expiration time on the specified file. This operation can only be executed against files.

Note: Folders are not supported.

  • Parameters

    path: str

      File path to set or remove expiration time
    

    expire_time: int

      The time that the file will expire, corresponding to the expiry_option that was set
    

    expiry_option: str

      Indicates the type of expiration to use for the file:
    
          1. NeverExpire: ExpireTime is ignored.
    
          1. RelativeToNow: ExpireTime is an integer in milliseconds representing the expiration date relative to when file expiration is updated.
    
          1. RelativeToCreationDate: ExpireTime is an integer in milliseconds representing the expiration date relative to file creation.
    
          1. Absolute: ExpireTime is an integer in milliseconds, as a Unix timestamp relative to 1/1/1970 00:00:00.
    

stat(self, path, invalidate_cache=True, expected_error_code=None)

File information for path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    invalidate_cache: bool

      Whether to invalidate cache or not
    

    expected_error_code: int

      Optionally indicates a specific, expected error code, if any.
    
  • Returns

    File information

tail(self, path, size=1024)

Return last bytes of file

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    size: int

      How many bytes to return
    
  • Returns

    Last(size) bytes of file

touch(self, path)

Create empty file

  • Parameters

    path: str or AzureDLPath

      Path of file to create
    
  • Returns

    None

unlink(self, path, recursive=False)

Remove a file or directory

  • Parameters

    path: str or AzureDLPath

      The location to remove.
    

    recursive: bool (True)

      Whether to remove also all entries below, i.e., which are returned
      by walk().
    
  • Returns

    None

walk(self, path='', details=False, invalidate_cache=True)

Get all files below given path

  • Parameters

    path: str or AzureDLPath

      Path to query
    

    details: bool

      Whether to include file details
    

    invalidate_cache: bool

      Whether to invalidate cache
    
  • Returns

    List of files

class azure.datalake.store.multithread.ADLUploader(adlfs, rpath, lpath, nthreads=None, chunksize=268435456, buffersize=4194304, blocksize=4194304, client=None, run=True, overwrite=False, verbose=False, progress_callback=None, timeout=0)

Upload local file(s) using chunks and threads

Launches multiple threads for efficient uploading, with chunksize assigned to each. The path can be a single file, a directory of files or a glob pattern.

  • Parameters

    adlfs: ADL filesystem instance

    rpath: str

      remote path to upload to; if multiple files, this is the dircetory
      root to write within
    

    lpath: str

      local path. Can be single file, directory (in which case, upload
      recursively) or glob pattern. Recursive glob patterns using \*\* are
      not supported.
    

    nthreads: int [None]

      Number of threads to use. If None, uses the number of cores.
    

    chunksize: int [2**28]

      Number of bytes for a chunk. Large files are split into chunks. Files
      smaller than this number will always be transferred in a single thread.
    

    buffersize: int [2**22]

      Number of bytes for internal buffer. This block cannot be bigger than
      a chunk and cannot be smaller than a block.
    

    blocksize: int [2**22]

      Number of bytes for a block. Within each chunk, we write a smaller
      block for each API call. This block cannot be bigger than a chunk.
    

    client: ADLTransferClient [None]

      Set an instance of ADLTransferClient when finer-grained control over
      transfer parameters is needed. Ignores nthreads and chunksize
      set by constructor.
    

    run: bool [True]

      Whether to begin executing immediately.
    

    overwrite: bool [False]

      Whether to forcibly overwrite existing files/directories. If False and
      remote path is a directory, will quit regardless if any files would be
      overwritten or not. If True, only matching filenames are actually
      overwritten.
    

    progress_callback: callable [None]

      Callback for progress with signature function(current, total) where
      current is the number of bytes transfered so far, and total is the
      size of the blob, or None if the total size is unknown.
    

    timeout: int (0)

      Default value 0 means infinite timeout. Otherwise time in seconds before the
      process will stop and raise an exception if  transfer is still in progress
    
  • Attributes

    hash

Methods

active(self)

Return whether the uploader is active

static clear_saved()

Remove references to all persisted uploads.

static load()

Load list of persisted transfers from disk, for possible resumption.

  • Returns

    A dictionary of upload instances. The hashes are auto

      generated unique. The state of the chunks completed, errored, etc.,
      can be seen in the status attribute. Instances can be resumed with
      `run()`.
    

run(self, nthreads=None, monitor=True)

Populate transfer queue and execute downloads

  • Parameters

    nthreads: int [None]

      Override default nthreads, if given
    

    monitor: bool [True]

      To watch and wait (block) until completion.
    

save(self, keep=True)

Persist this upload

Saves a copy of this transfer process in its current state to disk. This is done automatically for a running transfer, so that as a chunk is completed, this is reflected. Thus, if a transfer is interrupted, e.g., by user action, the transfer can be restarted at another time. All chunks that were not already completed will be restarted at that time.

See methods load to retrieved saved transfers and run to resume a stopped transfer.

  • Parameters

    keep: bool (True)

      If True, transfer will be saved if some chunks remain to be
      completed; the transfer will be sure to be removed otherwise.
    

successful(self)

Return whether the uploader completed successfully.

It will raise AssertionError if the uploader is active.

class azure.datalake.store.multithread.ADLDownloader(adlfs, rpath, lpath, nthreads=None, chunksize=268435456, buffersize=4194304, blocksize=4194304, client=None, run=True, overwrite=False, verbose=False, progress_callback=None, timeout=0)

Download remote file(s) using chunks and threads

Launches multiple threads for efficient downloading, with chunksize assigned to each. The remote path can be a single file, a directory of files or a glob pattern.

  • Parameters

    adlfs: ADL filesystem instance

    rpath: str

      remote path/globstring to use to find remote files. Recursive glob
      patterns using \*\* are not supported.
    

    lpath: str

      local path. If downloading a single file, will write to this specific
      file, unless it is an existing directory, in which case a file is
      created within it. If downloading multiple files, this is the root
      directory to write within. Will create directories as required.
    

    nthreads: int [None]

      Number of threads to use. If None, uses the number of cores.
    

    chunksize: int [2**28]

      Number of bytes for a chunk. Large files are split into chunks. Files
      smaller than this number will always be transferred in a single thread.
    

    buffersize: int [2**22]

      Ignored in curret implementation.
      Number of bytes for internal buffer. This block cannot be bigger than
      a chunk and cannot be smaller than a block.
    

    blocksize: int [2**22]

      Number of bytes for a block. Within each chunk, we write a smaller
      block for each API call. This block cannot be bigger than a chunk.
    

    client: ADLTransferClient [None]

      Set an instance of ADLTransferClient when finer-grained control over
      transfer parameters is needed. Ignores nthreads and chunksize set
      by constructor.
    

    run: bool [True]

      Whether to begin executing immediately.
    

    overwrite: bool [False]

      Whether to forcibly overwrite existing files/directories. If False and
      local path is a directory, will quit regardless if any files would be
      overwritten or not. If True, only matching filenames are actually
      overwritten.
    

    progress_callback: callable [None]

      Callback for progress with signature function(current, total) where
      current is the number of bytes transfered so far, and total is the
      size of the blob, or None if the total size is unknown.
    

    timeout: int (0)

      Default value 0 means infinite timeout. Otherwise time in seconds before the
      process will stop and raise an exception if  transfer is still in progress
    
  • Attributes

    hash

Methods

active(self)

Return whether the downloader is active

static clear_saved()

Remove references to all persisted downloads.

static load()

Load list of persisted transfers from disk, for possible resumption.

  • Returns

    A dictionary of download instances. The hashes are auto-

      generated unique. The state of the chunks completed, errored, etc.,
      can be seen in the status attribute. Instances can be resumed with
      `run()`.
    

run(self, nthreads=None, monitor=True)

Populate transfer queue and execute downloads

  • Parameters

    nthreads: int [None]

      Override default nthreads, if given
    

    monitor: bool [True]

      To watch and wait (block) until completion.
    

save(self, keep=True)

Persist this download

Saves a copy of this transfer process in its current state to disk. This is done automatically for a running transfer, so that as a chunk is completed, this is reflected. Thus, if a transfer is interrupted, e.g., by user action, the transfer can be restarted at another time. All chunks that were not already completed will be restarted at that time.

See methods load to retrieved saved transfers and run to resume a stopped transfer.

  • Parameters

    keep: bool (True)

      If True, transfer will be saved if some chunks remain to be
      completed; the transfer will be sure to be removed otherwise.
    

successful(self)

Return whether the downloader completed successfully.

It will raise AssertionError if the downloader is active.

azure.datalake.store.lib.auth(tenant_id=None, username=None, password=None, client_id='', client_secret=None, resource='https://datalake.azure.net/', require_2fa=False, authority=None, retry_policy=None, **kwargs)

User/password authentication

  • Parameters

    tenant_id: str

      associated with the user’s subscription, or “common”
    

    username: str

      active directory user
    

    password: str

      sign-in password
    

    client_id: str

      the service principal client
    

    client_secret: str

      the secret associated with the client_id
    

    resource: str

      resource for auth (e.g., [https://datalake.azure.net/](https://datalake.azure.net/))
    

    require_2fa: bool

      indicates this authentication attempt requires two-factor authentication
    

    authority: string

      The full URI of the authentication authority to authenticate against (such as [https://login.microsoftonline.com/](https://login.microsoftonline.com/))
    

    kwargs: key/values

      Other parameters, for future use
    
  • Returns

    :type DataLakeCredential :mod: A DataLakeCredential object

azure-data-lake-store-python's People

Contributors

akharit avatar asikaria-msft avatar begoldsm avatar bluca avatar chdevala avatar elvisace avatar jbcrail avatar lmazuel avatar markusweimer avatar martindurant avatar matt1883 avatar microsoft-github-policy-service[bot] avatar milanchandna avatar mvds00 avatar ro-joowan avatar rrenfrow86 avatar uranusjr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azure-data-lake-store-python's Issues

Upload 100GB file on machine with 112GB of memory results in OOM error and cascading errors

When I attempt to upload a 100GB file with 128 threads on a vm with 112GB of memory I get an out of memory error, followed by a large number of IO errors on closed streams.

When an error is encountered it should not immediately result in cascading errors if possible, and retry should be considered (although with OOM errors that is probably not practical).

Below is a partial stack trace. The IO errors continue for a long time:

PS E:\ingress> python C:\tools\sampleUploadDownload.py
Uploading file...
Traceback (most recent call last):
File "C:\tools\sampleUploadDownload.py", line 24, in
multithread.ADLUploader(adl, lpath=fileLocationToUpload, rpath=remoteFolderName + '/' + remoteFileName, nthreads=128
) # change the thread number up or down depending on tuning for perf
MemoryError
Exception ignored in: <bound method AzureDLFile.del of <ADL file: /tmp/e683991075_35701915648>>
Traceback (most recent call last):
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 702, in del
self.close()
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 685, in close
self.flush(force=True)
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 628, in flush
if self.buffer.tell() == 0:
ValueError: I/O operation on closed file.
Exception ignored in: <bound method AzureDLFile.del of <ADL file: /tmp/e683991075_42949672960>>
Traceback (most recent call last):
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 702, in del
self.close()
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 685, in close
self.flush(force=True)
File "c:\tools\azure-data-lake-store-python\adlfs\core.py", line 628, in flush
if self.buffer.tell() == 0:
ValueError: I/O operation on closed file.

AccessControlException after file upload

I am trying to upload the content of the local file into a DataLake store, with the following simple snippet:

token = lib.auth(tenant_id='782633d2-40ee-4e13-a016-xxxx', client_id='6a706405-9ef2-440c-9585-xxxxx', client_secret='yyyy=')
adl = AzureDLFileSystem(store_name='mystore', token=token)

adl.put('updates/update.xml', update.xm)

I can see the file is a data explorer, with a non-zero size, however, once I click on the file, I get

 Error
AccessControlException
Message
FsOpenStream failed with error 0x83090aa2 (Either the resource does not exist or the current user is not authorized to perform the requested operation).

 [58c2c59a-1a6d-42da-aaff-9f9bde8216be] failed with error 0x83090aa2 (Either the resource does not exist or the current user is not authorized to perform the requested operation).

 [58c2c59a-1a6d-42da-aaff-9f9bde8216be][2016-10-06T00:44:11.1559029-07:00]

I can oly delete the file from web browser. My user is the owner of the root datalake store folder.

PRI 0: Uploading 100GB file results in 200GB on the server

Using the multi-part uploader is not resulting in valid files being uploaded to the server. On completion of a 100GB file upload, checking the status of that file reports back a file that is 200GB.

It is critical that the multi-part upload and download logic have self-checks that confirm they have uploaded/download as much data as they are supposed to, and to report problems/failures if there is too much data.

As an example, there can be scenarios on the server where an append request "fails" from the client perspective (due to a timeout or something similar) but is committed on the server. In this case we should make sure that when we think we have finished uploading or downloading a segment that it is actually as long as we expect it to be, and to discard it/error out if it is not the right size.

More flexibility with trace logging

Currently we see a lot of general debug logging when executing, especially on large files/folders. Ideally, it would be good to be able to turn this on/off and have the ability to tweak verbosity (from debug all the way to just error cases). I am not sure if this is possible, but it would also be good to be able to optionally print out the request/response in a nice format. For example, PowerShell today hooks into the tracing logging that takes place within the standard .NET sdk to enable things like this if the user wants to debug what is actually being sent:

Get-AzureRmDataLakeAnalyticsAccount -Debug
DEBUG: ============================ HTTP REQUEST ============================

HTTP Method:
GET

Absolute Uri:
https://api-dogfood.resources.windows-int.net/subscriptions/90585f39-ab85-4921-bb37-0468f7efd1dd/providers/Microsoft.Da
taLakeAnalytics/accounts?api-version=2015-10-01-preview

Headers:
x-ms-client-request-id : fe5fda9a-fd40-475f-9dec-a35c2ae7fe0c
accept-language : en-US

Body:

DEBUG: ============================ HTTP RESPONSE ============================

Status Code:
OK

Headers:
Pragma : no-cache
x-ms-original-request-ids :
ed2622ab-58bf-4cef-b960-650f971e6ada,eea2a95c-20e1-49c9-ba4a-310e31b3ffd4,dc27ecf2-8a38-4d01-94f7-ddd8f9bdc171
x-ms-ratelimit-remaining-subscription-reads: 14863
x-ms-request-id : c55140b3-5d56-409e-a5c3-2d668b8b1134
x-ms-correlation-request-id : c55140b3-5d56-409e-a5c3-2d668b8b1134
x-ms-routing-request-id : CENTRALUS:20160901T233000Z:c55140b3-5d56-409e-a5c3-2d668b8b1134
Strict-Transport-Security : max-age=31536000; includeSubDomains
Cache-Control : no-cache
Date : Thu, 01 Sep 2016 23:29:59 GMT

Body:
{
"value": [
{
"properties": {
"provisioningState": "Succeeded",
"state": "Active",
"endpoint": "e2etestkonappebn3p.konaaccountdogfood.net",
"accountId": "61ab843c-2f11-42cc-9f1d-851b69d1281f",
"creationTime": "2016-08-11T22:58:35.9195777Z",
"lastModifiedTime": "2016-08-11T22:58:35.9195777Z"
},
"location": "brazilsouth",
"tags": null,
"id":
"/subscriptions/90585f39-ab85-4921-bb37-0468f7efd1dd/resourceGroups/konagroup-ppe-bn3p/providers/Microsoft.DataLakeAnal
ytics/accounts/e2etestkonappebn3p",
"name": "e2etestkonappebn3p",
"type": "Microsoft.DataLakeAnalytics/accounts"
} ...
}

Meta: Test scenarios

Cancel upload/download

  • Validate that the final file does not exist
  • Resume the upload/download
  • Validate that it completes successfully

Validate retry logic when a basic REST call fails for uploader/downloader

  • Mock a failure in append/create. Validate the operation retries and eventually fails
  • Resume the upload/download with mocked failure remove, verify it succeeds

Verify overwrite logic

  • Files/folders should not be overwritten if the user does not indicate a force operation
  • Files/folders should be overwritten if the user does indicate force (default should not be force)

Verify record boundary splitting is honored

  • If this is passed in, verify that it is honored.
  • Verify that if the user passes in this boundary splitting and the data is >4MB when a boundary is found, that we throw (since we cannot support records that are longer than 4mb).

Progress tracking/state validation

  • Validate that metadata about the upload/download is correct upon completion/error termination
  • This should also be done during the resume tests to ensure the metadata is accurate/useable.

Task 4.3: Package publishing

Get the functionality ready for package publishing, which includes ensuring our getting started documentation, samples and readthedocs code documentation is ready and has been reviewed.

Lower Pri: Large folder upload/download has a large startup cost in time

When uploading or downloading a very large directory, we see a large startup cost where the client is getting all the information it needs to begin (or resume) the operation. This could benefit from some parallelization or optimizations to reduce the amount of time needed to setup the operation.

The current timing is about 30 seconds for a folder with 10,000 1mb files in a nested structure.

Lots of memory errors when attempting to upload/download large files

When attempting to upload or download a large file (such as a 10GB file) I am constantly running into MemoryErrors. Looking at how much memory Python is using during this time I see that it is right around 2GB. This seems like a lot of memory taken to process a 10GB file. Additionally, 2GB of memory use shouldn't be enough to cause these failures in the python runtime (although I imagine that is just a setting somewhere). Ideally we shouldn't be scaling memory usage to be 20% of the file size, since we need to support the ability to upload files in the terabyte range (for upload and download).

Cancel logic hangs on large folders

On a folder containing 100,000 1mb files if I issue a cancel request the operation still hasn't returned control back to me after about 30 minutes (and it appears to still be uploading, very slowly).

Replace CONCAT with MSCONCAT

There is a design limitation in concat due to passing all of the paths in on the URI which can cause failure when concatenating a large number of files. Until this is fixed in a new WEBHDFS protocol, please use the MSCONCAT op code and pass in the paths in the body of the request as an application/octet-stream using the following format for the stream in the body:
sources=,,...,

To see more information about this API you can see the swagger specification for it here:
https://github.com/Azure/azure-rest-api-specs/blob/master/arm-datalake-store/filesystem/2015-10-01-preview/swagger/filesystem.json#L335

Additionally, for further optimizations for concat please see issue: #49

Optimization for APPEND, CREATE and OPEN

To avoid redirects and extra web calls, the following query parameters should be added to the APPEND, CREATE and OPEN operations:
APPEND: append=true
CREATE: write=true
OPEN: read=true

Documentation

Placeholder: no part of this project can be considered complete without adequate documentation, both as generated from docstrings, and explicit explanations of motivation, typical use etc.

Uploader fails if the full path to the target file doesn't exist

Creation of a file in ADLS does not require the underlying path to exist. The create call itself will create the entire path or throw if it can't. The recommendation is to remove this requirement and fail only if the target file itself can't be created (which should still be tested for at the very beginning).

Block size variable not set for non-delimited files

When opening non-delimited files in either wb or ab mode, the blocksize instance variable isn't set and causes some tests to fail with this error:

AttributeError: 'AzureDLFile' object has no attribute 'blocksize'

Pri 0: Uploading 50,000 1mb files resulting in a memory exception

The following exception is thrown when attempting to upload 50,000 1MB files. Note that it keeps going on like this for a very long time:

Uploading 50000 1MB files...
Traceback (most recent call last):
File "C:\tools\sampleUploadDownload.py", line 36, in
multithread.ADLUploader(adl, lpath='D:\ingress\largeFolder', rpath=remoteFolderName + '/50000files', nthreads=64)

change the thread number up or down depending on tuning for perf

File "c:\tools\azure-data-lake-store-python\adlfs\multithread.py", line 233, in init
self.run()
File "c:\tools\azure-data-lake-store-python\adlfs\multithread.py", line 271, in run
self.client.run(nthreads, monitor)
File "c:\tools\azure-data-lake-store-python\adlfs\transfer.py", line 337, in run
self.monitor()
File "c:\tools\azure-data-lake-store-python\adlfs\transfer.py", line 370, in monitor
self._wait(poll, timeout)
File "c:\tools\azure-data-lake-store-python\adlfs\transfer.py", line 352, in _wait
self._update()
File "c:\tools\azure-data-lake-store-python\adlfs\transfer.py", line 326, in _update
self.save()
File "c:\tools\azure-data-lake-store-python\adlfs\transfer.py", line 410, in save
pickle.dump(all_downloads, f)
MemoryError
Unhandled exception in thread started by
Unhandled exception in thread started by Unhandled exception in thread started by

Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-173, started daemon 11736)>>
Traceback (most recent call last):
Unhandled exception in thread started by
Unhandled exception in thread started by Unhandled exception in thread started by
Exception ignored in: <generator object CaseInsensitiveDict.iter.. at 0x37CD71D8>
Traceback (most recent call last):
Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-149, started daemon 17448)>>
Traceback (most recent call last):
Traceback (most recent call last):
Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-167, started daemon 4856)>>
Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-135, started daemon 18448)>>
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 66, in _work
er
Traceback (most recent call last):
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 66, in _work
er
Traceback (most recent call last):
Traceback (most recent call last):
<bound method Thread._bootstrap of <Thread(Thread-155, started daemon 6328)>>
Exception in thread Thread-189:
Traceback (most recent call last):
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 64, in _work
er
MemoryError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\threading.py", line 914, in bootstrap_inner
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\threading.py", line 862, in run
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 81, in work
er
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\logging__init
.py", line 1326, in critical
MemoryError

File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 66, in _work
er
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 64, in _work
er
Traceback (most recent call last):
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 64, in _work
er
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 66, in _work
er
Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-160, started daemon 17808)>>
Unhandled exception in thread started by <bound method Thread._bootstrap of <Thread(Thread-174, started daemon 14356)>>
MemoryError
MemoryError:
MemoryError

During handling of the above exception, another exception occurred:

File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 64, in _work
er
Traceback (most recent call last):
Exception ignored in: <bound method AzureDLFile.del of <ADL file: /begoldsm/50000files/f1/f2/16926>>
Traceback (most recent call last):
work_item.run()
File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 66, in _work
er
Exception ignored in: <bound method AzureDLFile.del of <ADL file: /begoldsm/50000files/f1/11838>>
Traceback (most recent call last):

During handling of the above exception, another exception occurred:

MemoryError

During handling of the above exception, another exception occurred:

MemoryError

During handling of the above exception, another exception occurred:

File "C:\Users\adlsperf\AppData\Local\Programs\Python\Python35-32\lib\concurrent\futures\thread.py", line 64, in _work
er
MemoryError

ADLUploader can silently fail on a segment, resulting in perceived success but no target file created

More logging, segment validation and feedback to the caller is required to indicate the health/status of uploads. For example, for a 100GB file we are seeing execution seem to succeed (no errors reported and the final message is output). It is important that even with the least noisy level of logging that we surface valid status messages to the caller, even if it is just a true/false message indicating that, from our perspective, the upload/download succeeded or failed.

Improvement Recomendation: More unique "tmp" folder per file/folder upload

This is a minor optimization which helps with the finalization step for each file. There is an option within the concatenate call: op=MSCONCAT&deleteSourceDirectory=true.

This call, instead of deleting each file individually, deletes the entire folder instead. This can be used if the only contents in the folder are just the contents for the file being concatenated and it drastically increases the performance of the concatenate operation.
Additionally, MSCONCAT should be the operation used (not op=CONCAT) due to the fact that, for very large files (or paths) putting all of those in the URI can result in failure. MSCONCAT puts all of that content in the body instead, which has a much larger capacity.

PRI0: Upload fails but reports success

Upload of a 50gb file is failing, even though no error is returned to the caller. This bug has two issues:

  1. There is not enough validation taking place in the upload to properly report success/failure to the caller with confidence
  2. Upload is failing consistently, which needs to be addressed and validation for it must be included in future runs.

With debug logs active I see that a lot of transfers are failing, most don't have any error at all associated with them, and some are indicating I/O errors due to accessing closed files (log attached).

uploadLog.txt

Task 5.1: Convenience layer for auto-generated clients

This is much lower priority than the previous four milestones, but if we have time it would be good to go over the auto-generated client functionality for our other four clients and see if there are any good quality of life improvements we can make for users.

Can no longer pass the store name or url_suffix into AzureDLFileSystem. Only option is env variable

In the comments it indicates there is a param called "store". In reality, this is the constructor definition for AzureDLFileSystem (including the comments of what the args are claimed to be as well):

store : str ("")
Store name to connect to
token : dict
When setting up a new connection, this contains the authorization
credentials (see lib.auth()).
url_suffix: str (None)
Domain to send REST requests to. The end-point URL is constructed
using this and the store_name. If None, use default.
kwargs: optional key/values
For auth, such as username, password. See lib.auth()
def init(self, token=None, **kwargs)

Note that store and url_suffix are both missing from the list.

Auth with user name and password does not work

token = lib.auth(tenant_id, username, password)

gives

adal.adal_error.AdalError: Get Token request returned http error: 400 and server response: {"error":"invalid_request","error_description":"AADSTS90014: The request body must contain the following parameter: 'client_id'.\r\nTrace ID: 899e028b-469f-4dba-ab97-518e5a3e42e5\r\nCorrelation ID: 24bd2ce3-b084-40ab-9e8f-46f5a54ac501\r\nTimestamp: 2016-10-06 09:21:19Z","error_codes":[90014],"timestamp":"2016-10-06 09:21:19Z","trace_id":"899e028b-469f-4dba-ab97-518e5a3e42e5","correlation_id":"24bd2ce3-b084-40ab-9e8f-46f5a54ac501"}

and according to this line https://github.com/Azure/azure-data-lake-store-python/blob/dev/azure/datalake/store/lib.py#L106

it cannot work just with username and password.

PRI 0: Use the Offset parameter in append to ensure we are always appending at the offset we expect

The append API allows for a query parameter called offset, which allows you to specify where you think the end of the file is to append data to. This is very useful for us, since it allows us to submit an append call where we "think" the file should currently be. If data has already been uploaded at that location, you will receive a 400 error with a BadOffsetException, which you can catch. This will indicate that data is already at that location and we can move on to the next 4mb chunk to upload. This will help preserve the uploaded file and ensure that its final length is the same as the source file.

Here is a sample request/response where the offset is not the end of the file (meaning there is already data at or ahead of the offset specified):

Request:
POST https://adlspysample01.azuredatalakestore.net/webhdfs/v1/sample.txt?op=APPEND&append=true&**offset=4**&api-version=2015-10-01-preview HTTP/1.1
x-ms-client-request-id: 33401497-4ac0-451e-ab13-eccd305e3706
accept-language: en-US
Authorization:
User-Agent: Microsoft.Azure.Management.DataLake.Store.DataLakeStoreFileSystemManagementClient/0.12.6-preview AzurePowershell/v2.1.0.0
CommandName: Add-AzureRmDataLakeStoreItemContent
ParameterSetName: __AllParameterSets
Content-Type: application/octet-stream
Host: adlspysample01.azuredatalakestore.net
Content-Length: 4
Expect: 100-continue

test

Response
HTTP/1.1 400 Bad Request
Cache-Control: no-cache
Pragma: no-cache
Content-Length: 260
Content-Type: application/json; charset=utf-8
Expires: -1
x-ms-request-id: a8b65992-21f3-4c5d-8fe0-f0b49d0320f1
Server-Perf: [a8b65992-21f3-4c5d-8fe0-f0b49d0320f1][ AuthTime::0::PostAuthTime::0 ][S-FsOpenStream :: 00:00:010 ms]%0a[BufferingTime :: 00:00:000 ms]%0a[WriteTime :: 00:00:000 ms]%0a[S-FsAppendStream :: 00:00:036 ms]%0a[S-FsCloseHandle :: 00:00:001 ms]%0a[APPEND :: 00:00:269 ms]%0a
x-ms-webhdfs-version: 16.07.18.01
Status: 0x83090015
X-Content-Type-Options: nosniff
Strict-Transport-Security: max-age=15724800; includeSubDomains
Date: Thu, 15 Sep 2016 01:21:15 GMT

{"RemoteException":{"exception":"BadOffsetException","message":"FsAppendStream failed with error 0x83090015 (Bad offset). [a8b65992-21f3-4c5d-8fe0-f0b49d0320f1][2016-09-14T18:21:16.0457431-07:00]","javaClassName":"org.apache.hadoop.fs.adl.BadOffsetException"}}

General view of the SDK choices

It will be very helpful for us to have a big picture of the choice you make for the SDK architecture. Clearly you have experience on big data services and it's amazing how the project advance. But i would like a like step back to better understand how amazing it is :)

Example:

  • What is dask and why do you it's important to follow this framework? Why do you think the future adlfs customers, which are datalake specialist, will be be glad that lib follow dask?
  • What are doing our competitors? Like Amazon, Google, etc.? Are they following dask too? If not, why do not choose to follow their pattern to facilitate the transition to Azure Datalake?

Thanks

urls are hard-coded

Should allow option to set the REST base URLs of the various user-facing interfaces.

Download operations failing silently sometimes

We are seeing issues where, unless progress tracking is being printed out, download fails without reporting any error messages, resulting in a corrupt downloaded file and the perception of success.

Example output:
remote file : /begoldsmtest/50_1GB_Files
remote file size: 52425000000
[errored] file begoldsmtest/50_1GB_Files/14 -> D:\ingress\50_1GB_Files.out\14, chunk D:\ingress\50_1GB_Files.out\14 0
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/14 -> D:\ingress\50_1GB_Files.out\14
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/36 -> D:\ingress\50_1GB_Files.out\36
[errored] file begoldsmtest/50_1GB_Files/45 -> D:\ingress\50_1GB_Files.out\45, chunk D:\ingress\50_1GB_Files.out\45 805306368
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/45 -> D:\ingress\50_1GB_Files.out\45
[errored] file begoldsmtest/50_1GB_Files/30 -> D:\ingress\50_1GB_Files.out\30, chunk D:\ingress\50_1GB_Files.out\30 0
[errored] file begoldsmtest/50_1GB_Files/30 -> D:\ingress\50_1GB_Files.out\30, chunk D:\ingress\50_1GB_Files.out\30 536870912
[ 2/ 4 chunks] begoldsmtest/50_1GB_Files/30 -> D:\ingress\50_1GB_Files.out\30
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/18 -> D:\ingress\50_1GB_Files.out\18
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/31 -> D:\ingress\50_1GB_Files.out\31
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/17 -> D:\ingress\50_1GB_Files.out\17
[errored] file begoldsmtest/50_1GB_Files/26 -> D:\ingress\50_1GB_Files.out\26, chunk D:\ingress\50_1GB_Files.out\26 805306368
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/26 -> D:\ingress\50_1GB_Files.out\26
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/29 -> D:\ingress\50_1GB_Files.out\29
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/11 -> D:\ingress\50_1GB_Files.out\11
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/50 -> D:\ingress\50_1GB_Files.out\50
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/39 -> D:\ingress\50_1GB_Files.out\39
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/10 -> D:\ingress\50_1GB_Files.out\10
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/7 -> D:\ingress\50_1GB_Files.out\7
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/12 -> D:\ingress\50_1GB_Files.out\12
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/3 -> D:\ingress\50_1GB_Files.out\3
[errored] file begoldsmtest/50_1GB_Files/9 -> D:\ingress\50_1GB_Files.out\9, chunk D:\ingress\50_1GB_Files.out\9 268435456
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/9 -> D:\ingress\50_1GB_Files.out\9
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/20 -> D:\ingress\50_1GB_Files.out\20
[errored] file begoldsmtest/50_1GB_Files/37 -> D:\ingress\50_1GB_Files.out\37, chunk D:\ingress\50_1GB_Files.out\37 0
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/37 -> D:\ingress\50_1GB_Files.out\37
[errored] file begoldsmtest/50_1GB_Files/33 -> D:\ingress\50_1GB_Files.out\33, chunk D:\ingress\50_1GB_Files.out\33 0
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/33 -> D:\ingress\50_1GB_Files.out\33
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/22 -> D:\ingress\50_1GB_Files.out\22
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/19 -> D:\ingress\50_1GB_Files.out\19
[errored] file begoldsmtest/50_1GB_Files/23 -> D:\ingress\50_1GB_Files.out\23, chunk D:\ingress\50_1GB_Files.out\23 805306368
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/23 -> D:\ingress\50_1GB_Files.out\23
[errored] file begoldsmtest/50_1GB_Files/5 -> D:\ingress\50_1GB_Files.out\5, chunk D:\ingress\50_1GB_Files.out\5 0
[ 3/ 4 chunks] begoldsmtest/50_1GB_Files/5 -> D:\ingress\50_1GB_Files.out\5
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/6 -> D:\ingress\50_1GB_Files.out\6
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/47 -> D:\ingress\50_1GB_Files.out\47
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/16 -> D:\ingress\50_1GB_Files.out\16
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/8 -> D:\ingress\50_1GB_Files.out\8
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/25 -> D:\ingress\50_1GB_Files.out\25
[errored] file begoldsmtest/50_1GB_Files/28 -> D:\ingress\50_1GB_Files.out\28, chunk D:\ingress\50_1GB_Files.out\28 805306368
[errored] file begoldsmtest/50_1GB_Files/28 -> D:\ingress\50_1GB_Files.out\28, chunk D:\ingress\50_1GB_Files.out\28 268435456
[ 2/ 4 chunks] begoldsmtest/50_1GB_Files/28 -> D:\ingress\50_1GB_Files.out\28
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/32 -> D:\ingress\50_1GB_Files.out\32
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/40 -> D:\ingress\50_1GB_Files.out\40
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/13 -> D:\ingress\50_1GB_Files.out\13
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/49 -> D:\ingress\50_1GB_Files.out\49
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/1 -> D:\ingress\50_1GB_Files.out\1
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/43 -> D:\ingress\50_1GB_Files.out\43
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/21 -> D:\ingress\50_1GB_Files.out\21
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/34 -> D:\ingress\50_1GB_Files.out\34
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/48 -> D:\ingress\50_1GB_Files.out\48
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/42 -> D:\ingress\50_1GB_Files.out\42
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/44 -> D:\ingress\50_1GB_Files.out\44
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/46 -> D:\ingress\50_1GB_Files.out\46
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/27 -> D:\ingress\50_1GB_Files.out\27
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/41 -> D:\ingress\50_1GB_Files.out\41
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/4 -> D:\ingress\50_1GB_Files.out\4
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/2 -> D:\ingress\50_1GB_Files.out\2
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/15 -> D:\ingress\50_1GB_Files.out\15
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/35 -> D:\ingress\50_1GB_Files.out\35
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/24 -> D:\ingress\50_1GB_Files.out\24
[ 4/ 4 chunks] begoldsmtest/50_1GB_Files/38 -> D:\ingress\50_1GB_Files.out\38
[bench_download_50_1gb] finished in 180.0904s
16ca4ecebb7987748097be6a87a98c20 D:\ingress\50_1GB_Files
3f38b1eccd3f79f2fde9b0e032c37449 D:\ingress\50_1GB_Files.out

ADLUploader defaults to overwrite if file exists

The operation should fail and the user should be told that they must explicitly use a force flag to overwrite an existing file when using ADLUploader. Today I can re-run the same upload command to the same target location and it won't complain.

Azure Python strong recommendation: match the naming conventions in os and os.path

@martindurant I just got some feedback from the Azure Python SDK team. They are pushing for our filesystem interface to match the naming conventions and styles that exist in os and os.path.
One other recommendation is a slight change for descriptive naming:
ls -> listdir or scandir and stat for a single item
cat and head -> readlines

I think these renames are also part of the OS module and just gives a more general set of methods that are not a specific OS's naming.

Clients need to handle refresh token if supplied during client construction

I mentioned this in our slack conversation last week, adding it here to keep it tracked.
We need to ensure that we maintain the same level of support and functionality that exist in the AutoRest clients. In those clients there is a base class called ServiceClient which is ultimately responsible for all http requests being sent and received. This class has logic that handles token expiration and attempts to refresh the token given the credentials that were initially used during client construction. Our client must also do this to ensure that we don't end up with expired tokens during long running operations (such as file upload/download).

Here is a link to the specific logic within AutoRest that handles the refresh: https://github.com/Azure/autorest/blob/45ddc1a84c58f5732a2f77418079c9439f39e892/src/client/Python/msrest/msrest/service_client.py#L189

The basic algorithm is:

  1. attempt to send request
  2. catch invalidgrant and token expired errors
  3. on those errors trigger a token refresh and resend the request
  4. If the request still fails due to invalidgrant or tokenexpired then we raise a token expired error.

Uploaded temp files should be 256mb in size

The chunking process needs to be two tiered:

  1. At most 4mb of data is uploaded in a single http request
  2. The intermediate files should be as close to 256mb as possible.

This is important for a couple of reasons:

  1. 4mb upload chunk size is the only way to guarantee atomicity of the upload operation for a single request. For example, if you upload 16mb in a single request and it fails, on retry you need to see if 0, 4, 8 or 12mb of data were uploaded (since the server will always commit in 4mb chunks).
  2. The intermediate files should be as closed to 256mb as possible to avoid massive fragmentation and lifecycle performance problems once the data is in the system. 256mb is the optimal size in our system for chunks to be concatenated at.

So, for example, if you have a 10GB file it should be split into ~40 chunks of 256mb. Each of those chunks can be uploaded in parallel using <= 4mb requests.

Place temporary files under destination path

As mentioned by Ben, we should place uploaded chunks in a temporary directory under the destination path. The proposed temporary directory/files are in keeping with other SDK implementations.

If the destination file is /user/deeperFolder/bar.txt, then the temporary directory will be:

/user/deeperFolder/bar.txt.segments.{random string}/

Within the temporary directory, the temporary chunk filenames will be:

bar.txt.{random string}.segment{int}

where int is the chunk number.

Licensing

Correct license notice and headers for all source files needed.

Client Usability: Error messages must be actionable and include trace IDs from the server

When the user receives an error from the client it must be actionable. This means it either

  1. Tells the user what they did wrong and how they can correct (FileNotFound, AccessDenied, Conflict, etc).
  2. Gives the user enough information to effectively escalate to service engineers (Full error responses from the service, x-ms-client-request-id, traceId, etc.).

Right now, in the parallel upload/download client we are returning a "RuntimeError: Max number of ADL retries exceeded" message. This should be updated to also include what the error was that kept resulting in retries so the user knows what the underlying failure was that caused us to abort the run.

This should also be true for all single calls, since this is a service and it could return a 500 error at some point (due to service unavailability for example) and its important that customers can give the service engineers as much information as possible to quickly and correctly identify and resolve their issue.

Need to implement block size restrictions

a) when writing, each REST call should pass data that is up to 4MB and ends on a delimiter (e.g., b'\n'), if given.
b) when splitting a file for upload, boundaries should be on a delimiter, if given - so need to scan files and find delimiters near the split boundaries.

Client credential authentication

We currently support getting a token via a username/password combo. But we need to also support authentication via client credentials.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.