Coder Social home page Coder Social logo

prefect-azure's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-azure's Issues

Failure in pull step โ€“ directories cause errors in pull with ADLS2 storage

I get this error when running a worker locally:

  File "/site-packages/prefect_azure/deployments/steps.py", line 189, in pull_from_azure_blob_storage
    with open(target, "wb") as f:
IsADirectoryError: [Errno 21] Is a directory: '/private/var/folders/rz/fywxxdh16cn2yw0450110kgr0000gq/T/tmpxz98pl1aprefect'

This is probably because we use Data Lake Storage Gen2 where folders are blobs as well. In basic Azure storage this probably isn't an issue as there folders are only virtual (aka don't exist as blobs).

Not sure how to best detect those blobs which are folders.

(We can easily migrate to basic Azure storage so this is not a blocker. Just important to fix or alternatively document if ADLS2 not supported for time being)

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

aiohttp error when running flows

Error description

The changes in #97 have resulted in the following error on the agent when trying to run a flow:
ImportError: aiohttp package is not installed

Proposed solution

I think this should be fixed by adding the aiohttp package to prefect-azure's requirements.txt. It's already in requirements-dev.txt, which explains why this wasn't caught during development. I'm currently unable to test this properly, otherwise I would have created a PR.

This is what azure-identity has to say about using azure.identity.aio:

This library includes a set of async APIs. To use the async credentials in azure.identity.aio, you must first install an async transport, such as aiohttp.

Workaround

We are currently attempting to work around the problem by explicitly adding aiohttp to our agent's list of requirements.

This seems to solve the above issue, however we've run into another issue instead which I think is unrelated but which we also haven't seen before upgrading from 0.2.6 to 0.2.8:

09:46:50.965 | INFO    | prefect.agent - Submitting flow run 'guid-removed'
09:46:53.772 | ERROR   | prefect.agent - Failed to submit flow run ''guid-removed' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 500, in _submit_run_and_capture_errors
    result = await infrastructure.run(task_status=task_status)
  File "/usr/local/lib/python3.8/site-packages/prefect_azure/container_instance.py", line 336, in run
    container_group = self._configure_container_group(container)
  File "/usr/local/lib/python3.8/site-packages/prefect_azure/container_instance.py", line 542, in _configure_container_group
    resource_group = resource_group_client.resource_groups.get(
  File "/usr/local/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 76, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/azure/mgmt/resource/resources/v2022_09_01/operations/_operations.py", line 11671, in get
    pipeline_response: PipelineResponse = self._client._pipeline.run(  # pylint: disable=protected-access
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 202, in run
    return first_node.send(pipeline_request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  [Previous line repeated 2 more times]
  File "/usr/local/lib/python3.8/site-packages/azure/mgmt/core/policies/_base.py", line 46, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 156, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_retry.py", line 448, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 111, in send
    self.on_request(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 91, in on_request
    self._update_headers(request.http_request.headers, self._token.token)
AttributeError: 'coroutine' object has no attribute 'token'

Join output from `_configure_container` into `preview`

"The preview should be able contain generated information too, as long as it doesn't require calls to the actual service e.g. I think the output of _configure_container could probably be in here? The more information we can give the user about what we're generating the better they can debug their configuration."

Originally from here

Worker creates invalid container names

Currently, using the worker to run deployments is broken unless the flow has a short and valid name. This is not a problem when running an old-fashioned agent.

prefect_azure/workers/container_instance.py doesn't do any sort of sanity checking when it comes to creating a container instance name, such as removing spaces or ensuring the name length is within Azure's limits. The invalid logic resides here: https://github.com/PrefectHQ/prefect-azure/blob/8f92ab76cb897eded99eeeed304517959a2caad6/prefect_azure/workers/container_instance.py#L552C46-L552C46

I think it would be better if the naming logic works exactly the same as in prefect_azure/container_instance.py, which lets the user set a name rather than hard-coding it based on the flow name, refrains from adding a prefect-prefix, pads 10 random characters instead of a guid and finally ensures that the name isn't too long for Azure (64 characters).

Pulling flows with sub-directories from Azure blob storage fails with "File exists" error when trying to create the sub-directory

Expectation / Proposal

Being able to use sub-directory when storing flows in Azure blob storage.

Traceback / Example

When we use subdirectory we get the stacktrace bellow when the job tries to pull the flow definition from the Azure blob storage :

The problem seems to come from the file /usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py, at line 249 because it tries to create the sub-dir, that was already created as an empty file, hence the "File exists" error.

I think it could be fixed by wrapping the following code with a condition so that this code is executed only when the current blob is a file, or if the library permit it to change the client.list_blobs(name_starts_with=folder) call to list only blobs that are directory.

target = PurePosixPath(
    local_path
    / relative_path_to_current_platform(blob.name).relative_to(folder)
)
Path.mkdir(Path(target.parent), parents=True, exist_ok=True)
with open(target, "wb") as f:
    client.download_blob(blob).readinto(f)    # <- Here we create a file even for blob that are directory, hence created empty files

Stack trace:

Worker 'KubernetesWorker 13c9d831-880d-48e6-a30e-5f9705c63f7d' submitting flow run '949281bf-48d3-44cd-b3cb-6e307fcc422c'
11:41:22 AM
prefect.flow_runs.worker
Creating Kubernetes job...
11:41:22 AM
prefect.flow_runs.worker
Failed to submit flow run '949281bf-48d3-44cd-b3cb-6e307fcc422c' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 633, in run
    pid = await run_sync_in_worker_thread(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 95, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 884, in _get_infrastructure_pid
    cluster_uid = self._get_cluster_uid(client)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 932, in _get_cluster_uid
    namespace = core_client.read_namespace("kube-system")
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 22686, in read_namespace
    return self.read_namespace_with_http_info(name, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 22765, in read_namespace_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '894d9e18-f3be-4964-9c85-53eb296f6906', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '1956710a-a6c8-4a63-894a-e83ca47b7441', 'X-Kubernetes-Pf-Prioritylevel-Uid': '06f00244-0730-414a-a982-70f4092edecb', 'Date': 'Wed, 28 Feb 2024 10:41:22 GMT', 'Content-Length': '347'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"namespaces \"kube-system\" is forbidden: User \"system:serviceaccount:prefect:prefect-worker\" cannot get resource \"namespaces\" in API group \"\" in the namespace \"kube-system\"","reason":"Forbidden","details":{"name":"kube-system","kind":"namespaces"},"code":403}

11:41:22 AM
prefect.flow_runs.worker
Completed submission of flow run '949281bf-48d3-44cd-b3cb-6e307fcc422c'
11:41:22 AM
prefect.flow_runs.worker
Reported flow run '949281bf-48d3-44cd-b3cb-6e307fcc422c' as crashed: Flow run could not be submitted to infrastructure
11:41:22 AM
prefect.flow_runs.worker
Opening process...
11:41:48 AM
prefect.flow_runs.runner
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 154, in run_steps
    step_output = await run_step(step, upstream_outputs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 125, in run_step
    result = await from_async.call_soon_in_new_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 294, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 319, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_azure/deployments/steps.py", line 188, in pull_from_azure_blob_storage
    Path.mkdir(Path(target.parent), parents=True, exist_ok=True)
  File "/usr/local/lib/python3.11/pathlib.py", line 1116, in mkdir
    os.mkdir(self, mode)
FileExistsError: [Errno 17] File exists: '/opt/prefect/flows'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 427, in retrieve_flow_then_begin_flow_run
    else await load_flow_from_flow_run(flow_run, client=client)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 249, in load_flow_from_flow_run
    output = await run_steps(deployment.pull_steps)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 182, in run_steps
    raise StepExecutionError(f"Encountered error while running {fqn}") from exc
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect_azure.deployments.steps.pull_from_azure_blob_storage
11:42:14 AM
prefect.flow_runs
Process for flow run 'mottled-rabbit' exited cleanly.

Our pull section in the deployment config (from prefect.yaml):

    pull:
      - prefect_azure.deployments.steps.pull_from_azure_blob_storage:
          id: pull_code
          requires: prefect-azure>=0.2.8
          container: flows
          folder: "prefect_workspace/"
          credentials:
            account_url: <account_url>

Our push section in the prefect.yaml

push:
  - prefect_azure.deployments.steps.push_to_azure_blob_storage:
      id: push_code
      requires: prefect-azure>=0.2.8
      container: flows
      folder: prefect_workspace
      credentials:
        account_url: <account_url>

Logic to keep ACI deployment name within Azure character limit error

Expectation / Proposal

The logic to assure the ACI deployment name is within Azure's character limit specification applies len() to a UUID without first casting to a string. Issue is workers/container_instance.py here.

Traceback / Example

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.10/site-packages/prefect_azure/workers/container_instance.py", line 571, in run
    max_length=55 - len(flow_run.id),
TypeError: object of type 'UUID' has no len()

Make the credentials optional for the AzureBlobStorageCredentials Block or for blob interaction in general.

Many users want to configure credentials at the resource level rather than creating a credential and entrusting it to Prefect. For example, we commonly see that Azure KeyVault is used to store secrets and users prefer to only have their secrets stored in one place, so enabling these blocks to function without storing secret values in Prefect is important.

Related to #68.

Make it possible to use blob_storage_download, blob_storage_upload, blob_storage_list and other Azure Storage interactions without storing credentials or secrets in Prefect.

Add `AzureBlobStorageContainer.list_blobs` method

Use case

It could be useful to be able to list the blobs to potentially perform actions on folders and blobs in my code, where I don't know the exact name of a file but I can write code to upload/download things from certain blobs.

Example

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer

my_connection_string = "mysecretstring"
existing_container = "testing314"

credentials = AzureBlobStorageCredentials(
    connection_string=my_connection_string,
)
container = AzureBlobStorageContainer(
    container_name=existing_container,
    credentials=credentials,
)

container.list_blobs()

# upload things to certain blobs

Saving Identities in Work-Pool does not persist to overview / summary

Expectation / Proposal

If I add an identity to an ACI work-pool, the output is not:

  • Persisted to the overview page
  • Is not retrieved or set as previously configured if I go in to edit it again.

This prompts the question of if it's in fact set, or if I need to add it everytime I make a change, or if I make another unrelated change if it's removed (without a value being set).

My expectation is simply that:

  • The identities are saved to the work-pool overview
  • Any work-pool saved identities are pre-populated in the work-pool edit configuration if they already were saved.

image

Traceback / Example

There is no traceback unfortunately, just a review of the experience saving / setting / retrieving.

image
image

Error when using blob_storage_list

I am receiving what seems a pydantic validation error when trying to list blob from storage.

Crash detected! Execution was interrupted by an unexpected exception:
KeyError: 0

If I use list_blobs, from oficial Azure Storage package, it works. The problem happens when I use the prefect azure package.

Steps to reproduce:

Python 3.8.10

start a docker container with Azurite to simulate an Azure Blob Storage. Fwiw, the problem happens in a real Azure Storage to.
docker run --name azurite -d -p 10000:10000 -v blob_storage:/var/lib/azure-blob/data mcr.microsoft.com/azure-storage/azurite azurite-blob --blobHost 0.0.0.0 --loose

The flow bellow will connect to Azurite, create a container, put some files on it, list it and after all, delete the container.

import tempfile
import asyncio
from prefect import flow, get_run_logger
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_list, blob_storage_upload


@flow
def sample_blob_storage_upload(blob_storage_credentials, container):
    
    with tempfile.TemporaryFile("rb") as f:
        for i in range(2):
            blob_storage_upload(
                data=f.read(),
                container=container,
                blob=f"upload_blob_{i}.txt",
                blob_storage_credentials=blob_storage_credentials,
                overwrite=True,
            )
@flow
async def clean(blob_service_client, container_name):
    # remove container
    await blob_service_client.delete_container(container_name)

@flow
async def run_blob_files():
    logger = get_run_logger()
    logger.info("Connecting...")
    connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;"
    container_name = "my-container"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )

    blob_service_client = blob_storage_credentials.get_client()

    # create container
    logger.info("Creating container...")
    try:
        await blob_service_client.create_container(container_name)
    except:
        pass
    
    # upload files
    logger.info("Uploading files...")
    sample_blob_storage_upload(blob_storage_credentials, container_name)

    # list files
    logger.warning("Listing files...")

    #ERROR HAPPENS HERE!
    blobs = await blob_storage_list(
        container=container_name,
        blob_storage_credentials=blob_storage_credentials,
    )

    assert blobs == list(range(2))

    # remove container
    logger.info("Removing container...")
    await clean(blob_service_client, container_name)

    logger.info("Closing connection...")
    await blob_service_client.close()
    

asyncio.run(run_blob_files())


Error:

python flows/azure_error.py 
17:09:49.874 | INFO    | prefect.engine - Created flow run 'naughty-taipan' for flow 'run-blob-files'
17:09:50.060 | INFO    | Flow run 'naughty-taipan' - Connecting...
17:09:50.063 | INFO    | Flow run 'naughty-taipan' - Creating container...
17:09:50.077 | INFO    | Flow run 'naughty-taipan' - Uploading files...
17:09:51.935 | INFO    | Flow run 'naughty-taipan' - Created subflow run 'delightful-octopus' for flow 'sample-blob-storage-upload'
17:09:52.040 | INFO    | Flow run 'delightful-octopus' - Created task run 'blob_storage_upload-43105911-0' for task 
'blob_storage_upload'
17:09:52.041 | INFO    | Flow run 'delightful-octopus' - Executing 'blob_storage_upload-43105911-0' immediately...
17:09:52.096 | INFO    | Task run 'blob_storage_upload-43105911-0' - Uploading blob to container my-container with key upload_blob_0.txt
17:09:52.197 | INFO    | Task run 'blob_storage_upload-43105911-0' - Finished in state Completed()
17:09:52.254 | INFO    | Flow run 'delightful-octopus' - Created task run 'blob_storage_upload-43105911-1' for task 
'blob_storage_upload'
17:09:52.258 | INFO    | Flow run 'delightful-octopus' - Executing 'blob_storage_upload-43105911-1' immediately...
17:09:52.303 | INFO    | Task run 'blob_storage_upload-43105911-1' - Uploading blob to container my-container with key upload_blob_1.txt
17:09:52.337 | INFO    | Task run 'blob_storage_upload-43105911-1' - Finished in state Completed()
17:09:52.375 | INFO    | Flow run 'delightful-octopus' - Finished in state Completed('All states completed.')
17:09:52.377 | WARNING | Flow run 'naughty-taipan' - Listing files...
17:09:52.413 | INFO    | Flow run 'naughty-taipan' - Created task run 'blob_storage_list-266e54b8-0' for task 'blob_storage_list'
17:09:52.414 | INFO    | Flow run 'naughty-taipan' - Executing 'blob_storage_list-266e54b8-0' immediately...
17:09:52.463 | INFO    | Task run 'blob_storage_list-266e54b8-0' - Listing blobs from container my-container
17:09:52.471 | ERROR   | Task run 'blob_storage_list-266e54b8-0' - Crash detected! Execution was interrupted by an unexpected exception:
KeyError: 0

17:09:52.500 | ERROR   | Flow run 'naughty-taipan' - Encountered exception during execution:
17:09:52.552 | ERROR   | Flow run 'naughty-taipan' - Finished in state Failed('Flow run encountered an exception. KeyError: 0\n')
Traceback (most recent call last):
  File "flows/azure_error.py", line 65, in <module>
    asyncio.run(run_blob_files())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "flows/azure_error.py", line 50, in run_blob_files
    blobs = await blob_storage_list(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/task_runners.py", line 204, in submit
    result = await call()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1108, in begin_task_run
    return await orchestrate_task_run(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1264, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 257, in return_value_to_state
    return Completed(data=await result_factory.create_result(retval))
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 430, in Completed
    return schemas.states.Completed(cls=cls, **kwargs)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 233, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "pydantic/main.py", line 340, in pydantic.main.BaseModel.__init__
  File "pydantic/main.py", line 1076, in pydantic.main.validate_model
  File "pydantic/fields.py", line 884, in pydantic.fields.ModelField.validate
  File "pydantic/fields.py", line 1094, in pydantic.fields.ModelField._validate_singleton
  File "pydantic/fields.py", line 892, in pydantic.fields.ModelField.validate
  File "pydantic/fields.py", line 1148, in pydantic.fields.ModelField._apply_validators
  File "pydantic/class_validators.py", line 318, in pydantic.class_validators._generic_validator_basic.lambda13
  File "pydantic/main.py", line 716, in pydantic.main.BaseModel.validate
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/azure/storage/blob/_shared/models.py", line 197, in __getitem__
    return self.__dict__[key]
KeyError: 0

requirements.txt

prefect
prefect-azure
adlfs

pip freeze:

adal==1.2.7
adlfs==2022.10.0
aiohttp==3.8.3
aiosignal==1.3.1
aiosqlite==0.17.0
alembic==1.8.1
anyio==3.6.2
apprise==1.2.0
asgi-lifespan==2.0.0
async-timeout==4.0.2
asyncpg==0.27.0
attrs==22.1.0
azure-common==1.1.28
azure-core==1.26.1
azure-datalake-store==0.0.52
azure-identity==1.12.0
azure-mgmt-containerinstance==10.0.0
azure-mgmt-core==1.3.2
azure-mgmt-resource==21.2.1
azure-storage-blob==12.14.1
cachetools==5.2.0
certifi==2022.9.24
cffi==1.15.1
charset-normalizer==2.1.1
click==8.1.3
cloudpickle==2.2.0
colorama==0.4.6
commonmark==0.9.1
coolname==2.0.0
croniter==1.3.7
cryptography==38.0.3
docker==6.0.1
fastapi==0.87.0
frozenlist==1.3.3
fsspec==2022.11.0
google-auth==2.14.1
greenlet==2.0.1
griffe==0.24.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==0.15.0
httpx==0.23.0
hyperframe==6.0.1
idna==3.4
importlib-metadata==5.0.0
importlib-resources==5.10.0
isodate==0.6.1
jsonpatch==1.32
jsonpointer==2.3
kubernetes==25.3.0
Mako==1.2.4
Markdown==3.4.1
MarkupSafe==2.1.1
msal==1.20.0
msal-extensions==1.0.0
msrest==0.7.1
multidict==6.0.2
oauthlib==3.2.2
orjson==3.8.1
packaging==21.3
pathspec==0.10.2
pendulum==2.1.2
portalocker==2.6.0
prefect==2.6.7
prefect-azure==0.2.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
pydantic==1.10.2
Pygments==2.13.0
PyJWT==2.6.0
pyparsing==3.0.9
python-dateutil==2.8.2
python-slugify==6.1.2
pytz==2022.6
pytzdata==2020.1
PyYAML==6.0
readchar==4.0.3
requests==2.28.1
requests-oauthlib==1.3.1
rfc3986==1.5.0
rich==12.6.0
rsa==4.9
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.44
starlette==0.21.0
text-unidecode==1.3
toml==0.10.2
typer==0.7.0
typing-extensions==4.4.0
urllib3==1.26.12
uvicorn==0.19.0
websocket-client==1.4.2
yarl==1.8.1
zipp==3.10.0

Deployment step for azure breaks with' DefaultAzureCredential.get_token' was never awaited

Expectation / Proposal

I am using prefect deploy with prefect_azure.deployments.steps.push_to_azure_blob_storage.
For the credentials I dont want to use a connection string but account url as below:
credentials:
account_url: https://xxx.blob.core.windows.net/

When doing so this leads to usage of DefaultAzureCredential() - which will start an authentication flow as per description in
https://github.com/PrefectHQ/prefect-azure/blob/main/prefect_azure/deployments/steps.py

    Push to an Azure Blob Storage container using an account URL and
    default credentials:
    ```yaml
    push:
        - prefect_azure.deployments.steps.push_to_azure_blob_storage:
            requires: prefect-azure[blob_storage]
            container: my-container
            folder: my-folder
            credentials:
                account_url: https://myaccount.blob.core.windows.net/

Expectation is that this will authenticate.
However, in the code of steps.py we find that we import the DefaultAzureCredential from aio (async), but the steps.py, the container client as well as the rest of the code is written in sync not async.

from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob import ContainerClient

This is why we end up in error about "never awaited".

The problem can be very easily fixed by changing

from azure.identity.aio import DefaultAzureCredential

to its sync counterpart:

from azure.identity import DefaultAzureCredential

Traceback / Example

Traceback (most recent call last):
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 124, in run_steps
step_output = await run_step(step, upstream_outputs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 95, in run_step
result = await from_async.call_soon_in_new_thread(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect_azure/deployments/steps.py", line 119, in push_to_azure_blob_storage
client.upload_blob(str(remote_file_path), f, overwrite=True)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_container_client.py", line 1081, in upload_blob
blob.upload_blob(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py", line 737, in upload_blob
return upload_block_blob(**options)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
response = client.upload(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 845, in upload
pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 230, in run
return first_node.send(pipeline_request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
[Previous line repeated 2 more times]
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 197, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_shared/policies.py", line 520, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 124, in send
self.on_request(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 100, in on_request
self._update_headers(request.http_request.headers, self._token.token)
AttributeError: 'coroutine' object has no attribute 'token'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 383, in call
return self.result()
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
File "/opt/conda/envs/prefect.venv/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/deploy.py", line 257, in deploy
await _run_single_deploy(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/deploy.py", line 519, in _run_single_deploy
await run_steps(push_steps, step_outputs, print_function=app.console.print)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 152, in run_steps
raise StepExecutionError(f"Encountered error while running {fqn}") from exc
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect_azure.deployments.steps.push_to_azure_blob_storage
An exception occurred.
sys:1: RuntimeWarning: coroutine 'DefaultAzureCredential.get_token' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Azure container instance worker needs image field but it's not required

Expectation / Proposal

If I do not complete the image field in my azure container instance worker I get the below error (in the state message) when I try to run a flow.

There's a pretty simple workaround - If I add an image using the work pool edit page in the UI then the flow runs successfully.

Traceback / Example

Submission failed. Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/azure/core/polling/base_polling.py", line 466, in run self._poll() File "/usr/local/lib/python3.10/site-packages/azure/core/polling/base_polling.py", line 500, in _poll raise OperationFailed("Operation failed or canceled") azure.core.polling.base_polling.OperationFailed: Operation failed or canceled During handling of the above exception, another exception occurred: azure.core.exceptions.HttpResponseError: (DeploymentFailed) At least one resource deployment operation failed. Please list deployment operations for details. Please see https://aka.ms/arm-deployment-operations for usage details. Code: DeploymentFailed Message: At least one resource deployment operation failed. Please list deployment operations for details. Please see https://aka.ms/arm-deployment-operations for usage details. Target: /subscriptions/08de200a-6be9-4a50-8a7d-a19b3af79951/resourceGroups/azure-worker/providers/Microsoft.Resources/deployments/prefect-hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b Exception Details: (InvalidContainerImage) The image cannot be empty for container 'eta7hvevnmuwo' in container group 'hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b'. Code: InvalidContainerImage Message: The image cannot be empty for container 'eta7hvevnmuwo' in container group 'hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b'.

Authentication issue when using DefaultAzureCredential for blob storage operations

Expectation / Proposal

The prefect-azure docs mention that for authentication for blob storage operations, you can choose to give the connection string, or leave it out and give the account url (in which case DefaultAzureCredential will be used). The latter is nice because it's passwordless.

However, this does not seem to work?

Azure's library throws an exception: TypeError: object AccessToken can't be used in 'await' expression.

Traceback / Example

Copied the official example, and swapped out the connection values:

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

@flow
def example_blob_storage_upload_flow():
    blob_storage_credentials = AzureBlobStorageCredentials(
        account_url="https://xxx.blob.core.windows.net/",
    )

    with open("data.csv", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="etadata",
            blob="data.csv",
            blob_storage_credentials=blob_storage_credentials,
            overwrite=False,
        )
    return blob

example_blob_storage_upload_flow()

->

Traceback
C:\Users\staljaard\Source\onlyazuretest\.venv\Scripts\python.exe C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py 
15:17:15.454 | INFO    | prefect.engine - Created flow run 'straight-wolf' for flow 'example-blob-storage-upload-flow'
15:17:15.637 | INFO    | Flow run 'straight-wolf' - Created task run 'blob_storage_upload-0' for task 'blob_storage_upload'
15:17:15.638 | INFO    | Flow run 'straight-wolf' - Executing 'blob_storage_upload-0' immediately...
15:17:15.727 | INFO    | Task run 'blob_storage_upload-0' - Uploading blob to container etadata with key data.csv
15:17:18.483 | ERROR   | Task run 'blob_storage_upload-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
15:17:18.533 | ERROR   | Task run 'blob_storage_upload-0' - Finished in state Failed("Task run encountered an exception: TypeError: object AccessToken can't be used in 'await' expression\n")
15:17:18.535 | ERROR   | Flow run 'straight-wolf' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 703, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 17, in example_blob_storage_upload_flow
    blob = blob_storage_upload(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\tasks.py", line 494, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1006, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1171, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
15:17:18.691 | ERROR   | Flow run 'straight-wolf' - Finished in state Failed("Flow run encountered an exception. TypeError: object AccessToken can't be used in 'await' expression\n")
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 26, in <module>
    example_blob_storage_upload_flow()
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\flows.py", line 478, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\client\utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 258, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 703, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 17, in example_blob_storage_upload_flow
    blob = blob_storage_upload(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\tasks.py", line 494, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1006, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1171, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
This happens irrespective of if you call the task async/sync.

(Actually I would like to but I don't have time at this moment ๐Ÿ˜‘)

Make the AzureContainerInstanceJob creds block optional for AzureContainerInstanceCredentials infra block.

As you can see in the screen shot, a credentials block is required.

image

Many users want to configure credentials at the resource level rather than creating a credential and entrusting it to Prefect. For example, we commonly see that Azure KeyVault is used to store secrets and users prefer to only have their secrets stored in one place, so enabling these blocks to function without storing secret values in Prefect is important.

Create Azure Storage Pull Step

Add a project step that allows users to pull their project from a Azure Blob Storage container at a given path. Should accept credentials that can be used for authentication with Azure.

User Assigned Identities non-functional

When adding a user-assigned identity with the work-pool, it's assigned and added as a property of "resources.properties".

Expectation / Proposal

Checking the arm template, this should be a direct property of "resources".

Traceback / Example

The request content was invalid and could not be deserialized: 'Could not find member 'identity' on object of type 'ContainerGroupPropertiesDefinition'. Path 'properties.identity', line 1, position 815.'."

AzureContainerInstanceJob doesn't support Managed Identity to deploy Container Instance for authenticated Image Registry

Expectation / Proposal

AzureContainerInstanceJob only allows the use of username and password for authentication to container registry. Prefect should allow for using Managed Identities to authenticate to Azure Container Registry (ACR). This would allow for agents/flows running on Azure based compute to leverage Azure AD for authentication. This is especially useful in organizations that want to limit access to admin credentials that are used in ACR.

I propose adding an additional option to block for AzureContainerInstanceJob to allow use of an User Assigned Managed Identity and Azure Container registry exclusively.
Suggestion azure_container_registry which would accept both user name/password or Azure Managed Identity.
We then update _configure_container_group method to handle both class options with image _registry being default for backwards compatibility

References -

  1. Azure Python SDK ImageRegistryCredential Class- https://learn.microsoft.com/en-us/python/api/azure-mgmt-containerinstance/azure.mgmt.containerinstance.models.imageregistrycredential?view=azure-python
  2. Rest API Reference - https://learn.microsoft.com/en-us/rest/api/container-instances/container-groups/create-or-update?tabs=HTTP#imageregistrycredential
  3. ARM Deployment Example - https://learn.microsoft.com/en-us/azure/container-instances/using-azure-container-registry-mi#deploy-using-an-azure-resource-manager-arm-template

Traceback / Example

`AzureBlobStorageCredentials` `connection_string` not validated

Perhaps there is another place we could validate it but this seems a good place to as any.

Example

from prefect_azure import AzureBlobStorageCredentials

my_connection_string = "dsfdsf"  # fake

credentials = AzureBlobStorageCredentials(
    connection_string=my_connection_string,
)

The following example also does not fail:

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer

my_connection_string = "dsfdsf"  # fake

credentials = AzureBlobStorageCredentials(
    connection_string=my_connection_string,
)
container = AzureBlobStorageContainer(
    container_name="testing314",  # exists
    credentials=credentials,
)

Providing no credentials to `AzureBlobStorageCredentials` raises wrong error message

When I don't provide anything to my AzureBlobStorageCredentials:

e.g.

credentials = AzureBlobStorageCredentials()

I get this error:

โฏ python qa.py
Traceback (most recent call last):
  File "/Users/bean/code-oss/prefect-azure/qa.py", line 6, in <module>
    credentials = AzureBlobStorageCredentials(
  File "/Users/bean/code-oss/prefect-azure/azurevenv/lib/python3.10/site-packages/prefect/blocks/core.py", line 265, in __init__
    super().__init__(*args, **kwargs)
  File "/Users/bean/code-oss/prefect-azure/azurevenv/lib/python3.10/site-packages/pydantic/v1/main.py", line 341, in __init__
    raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for AzureBlobStorageCredentials
__root__
  Must provide either a connection string or account URL, but not both. (type=value_error)

Because the validator is only doing an xor check, not checking that one of the values is True.

@root_validator
def check_connection_string_or_account_url(
    cls, values: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Checks that either a connection string or account URL is provided, not both.
    """
    has_account_url = values.get("account_url") is not None
    has_conn_str = values.get("connection_string") is not None
    if not bool(has_account_url ^ has_conn_str):
        raise ValueError(
            "Must provide either a connection string or account URL, but not both."
        )
    return values

Different Permission / Scope required between an Azure Agent and Azure Worker

Expectation / Proposal

When deploying with an agent and an AzureContainerInstanceJob, the Azure Credentials that are necessary to configure and provision infrastructure are different.
The expectation is these should match - if they are not intended to match, then potentially some description and expectation of the changed behavior, and new permission requirements necessary, as this causes some security concerns in capabilities with customers.

The permissions and scope required across for an agent is only write (and delete) scope across container-instances.
https://learn.microsoft.com/en-us/azure/container-instances/

The permissions and scope required across workers however, is write scope across ARM - while for Prefect we just use this to create an underlying container group as an ARM template, the implications, is that this is a VERY broad capability that can be used to provision anything Azure Resource Manager can provision (which can be much more than just a container group).
https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/azure-services-resource-providers

Traceback / Example

Using a custom scoped service principal with the following permissions, we can see two things occurring:

  • The container creation is successful in the resource group (the flow running is not relevant to this issue, solely the infrastructure creation)
  • Viewing the deployment screen, we can see this did not create or write an ARM deployment.
az role definition create --role-definition @roledefinition.json
Readonly attribute type will be ignored in class <class 'azure.mgmt.authorization.v2022_04_01.models._models_py3.RoleDefinition'>
{
  "assignableScopes": [
    "/subscriptions/02c18f1f-d1b0-4674-8400-35ae81149ba1"
  ],
  "description": "Can manage container instances",
  "id": "/subscriptions/02c18f1f-d1b0-4674-8400-35ae81149ba1/providers/Microsoft.Authorization/roleDefinitions/5e4a82da-8f4f-4aef-acaa-b3cba142c61b",
  "name": "5e4a82da-8f4f-4aef-acaa-b3cba142c61b",
  "permissions": [
    {
      "actions": [
        "Microsoft.ContainerInstance/containerGroups/*",
        "Microsoft.Resources/subscriptions/resourceGroups/read"

      ],
      "dataActions": [],
      "notActions": [],
      "notDataActions": []
    }
  ],
  "roleName": "Container Instances Contributor",
  "roleType": "CustomRole",
  "type": "Microsoft.Authorization/roleDefinitions"
}

image
image

Conversely, using this very same user (credentials are stored as aci-test-creds block) for the work-pool, crashes with the following traceback:

azure.core.exceptions.HttpResponseError: (AuthorizationFailed) The client 'c98627c0-8904-43fe-a0ee-7be160276a9d' with object id 'c98627c0-8904-43fe-a0ee-7be160276a9d' does not have authorization to perform action 'Microsoft.Resources/deployments/write' over scope '/subscriptions/02c18f1f-d1b0-4674-8400-35ae81149ba1/resourcegroups/aci-prefect-agent/providers/Microsoft.Resources/deployments/prefect-hello-world-fbfa981c-54a8-44d1-ba45-735a09baa807' or the scope is invalid. If access was recently granted, please refresh your credentials.
Code: AuthorizationFailed
Message: The client 'c98627c0-8904-43fe-a0ee-7be160276a9d' with object id 'c98627c0-8904-43fe-a0ee-7be160276a9d' does not have authorization to perform action 'Microsoft.Resources/deployments/write' over scope '/subscriptions/02c18f1f-d1b0-4674-8400-35ae81149ba1/resourcegroups/aci-prefect-agent/providers/Microsoft.Resources/deployments/prefect-hello-world-fbfa981c-54a8-44d1-ba45-735a09baa807' or the scope is invalid. If access was recently granted, please refresh your credentials.
14:40:16.283 | INFO    | prefect.flow_runs.worker - Completed submission of flow run 'fbfa981c-54a8-44d1-ba45-735a09baa807'
14:40:16.380 | INFO    | prefect.flow_runs.worker - Reported flow run 'fbfa981c-54a8-44d1-ba45-735a09baa807' as crashed: Flow run could not be submitted to infrastructure

Create Azure Blob Storage Push Step

Add a project step that allows users to push their project to an Azure Blob Storage container at a given path. Should accept credentials that can be used for authentication with Azure.

AzureStorageBlobContainer not working as PREFECT_DEFAULT_RESULT_STORAGE_BLOCK

Expectation / Proposal

I want to migrate my existing azure/storage to azure-blob-storage-container/storage and use it with PREFECT_DEFAULT_RESULT_STORAGE_BLOCK

Traceback / Example

I have configured a AzureBlobStorageContainer block using:

        from os import environ
        from prefect_azure import AzureBlobStorageCredentials
        from prefect_azure.blob_storage import AzureBlobStorageContainer

        credentials = AzureBlobStorageCredentials(
          account_url=f"https://{environ['STORAGE_ACCOUNT']}.blob.core.windows.net",
        )
        credentials.save(name="result-storage-credentials", overwrite=True)

        block = AzureBlobStorageContainer(
          container_name="results",
          credentials=credentials,
        )
        block.save(name="result-storage", overwrite=True)

In my kubernetes base jobs template, I have changed my setting for PREFECT_DEFAULT_RESULT_STORAGE_BLOCK
from azure/result-storage to azure-blob-storage-container/result-storage

However, when I run a flow I get the following error:

Crash detected! Execution was interrupted by an unexpected exception: KeyError: "No class found for dispatch key 'azure-blob-storage-container' in registry for type 'Block'."

My prefect version is 2.16.4.

Missing pip dependency causes failure in prefect_azure.deployments.steps.pull_from_azure_blob_storage

I was following along with the documentation to use Azure Blob storage as a push and pull storage location for flow code in prefect.yaml. Unfortunately due to a missing dependency this throws an error. I was able to manually issue the correct pip install commands to resolve it, but ideally this would be included in requirements.txt for prefect-azure. This happened with prefect-azure 0.2.11.

Please consider adding azure-storage-blob to requirements.txt so that this is picked up by containers automatically when they attempt to retrieve code.

Here is the traceback message:

Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/prefect/deployments/steps/core.py", line 122, in run_steps
step_output = await run_step(step, upstream_outputs)
File "/usr/local/lib/python3.9/dist-packages/prefect/deployments/steps/core.py", line 92, in run_step
step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
File "/usr/local/lib/python3.9/dist-packages/prefect/deployments/steps/core.py", line 60, in _get_function_for_step
step_func = import_object(fully_qualified_name)
File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/importtools.py", line 212, in import_object
module = load_module(module_name)
File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/importtools.py", line 183, in load_module
return importlib.import_module(module_name)
File "/usr/lib/python3.9/importlib/init.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "", line 1030, in _gcd_import
File "", line 1007, in _find_and_load
File "", line 986, in _find_and_load_unlocked
File "", line 680, in _load_unlocked
File "", line 790, in exec_module
File "", line 228, in _call_with_frames_removed
File "/usr/local/lib/python3.9/dist-packages/prefect_azure/deployments/steps.py", line 37, in
from azure.storage.blob import ContainerClient
ModuleNotFoundError: No module named 'azure.storage'

Here is how it's defined in prefect.yaml:

pull:
- prefect_azure.deployments.steps.pull_from_azure_blob_storage:
    id: pull_code
    requires: prefect-azure>=0.2.8
    container: '{{ push_code.container }}'
    folder: '{{ push_code.folder }}'
    credentials: '{{ prefect.blocks.azure-blob-storage-credentials.flow-storage }}'

Thank you!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.