Coder Social home page Coder Social logo

prefect-azure's Issues

aiohttp error when running flows

Error description

The changes in #97 have resulted in the following error on the agent when trying to run a flow:
ImportError: aiohttp package is not installed

Proposed solution

I think this should be fixed by adding the aiohttp package to prefect-azure's requirements.txt. It's already in requirements-dev.txt, which explains why this wasn't caught during development. I'm currently unable to test this properly, otherwise I would have created a PR.

This is what azure-identity has to say about using azure.identity.aio:

This library includes a set of async APIs. To use the async credentials in azure.identity.aio, you must first install an async transport, such as aiohttp.

Workaround

We are currently attempting to work around the problem by explicitly adding aiohttp to our agent's list of requirements.

This seems to solve the above issue, however we've run into another issue instead which I think is unrelated but which we also haven't seen before upgrading from 0.2.6 to 0.2.8:

09:46:50.965 | INFO    | prefect.agent - Submitting flow run 'guid-removed'
09:46:53.772 | ERROR   | prefect.agent - Failed to submit flow run ''guid-removed' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 500, in _submit_run_and_capture_errors
    result = await infrastructure.run(task_status=task_status)
  File "/usr/local/lib/python3.8/site-packages/prefect_azure/container_instance.py", line 336, in run
    container_group = self._configure_container_group(container)
  File "/usr/local/lib/python3.8/site-packages/prefect_azure/container_instance.py", line 542, in _configure_container_group
    resource_group = resource_group_client.resource_groups.get(
  File "/usr/local/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 76, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/azure/mgmt/resource/resources/v2022_09_01/operations/_operations.py", line 11671, in get
    pipeline_response: PipelineResponse = self._client._pipeline.run(  # pylint: disable=protected-access
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 202, in run
    return first_node.send(pipeline_request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 70, in send
    response = self.next.send(request)
  [Previous line repeated 2 more times]
  File "/usr/local/lib/python3.8/site-packages/azure/mgmt/core/policies/_base.py", line 46, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 156, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_retry.py", line 448, in send
    response = self.next.send(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 111, in send
    self.on_request(request)
  File "/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 91, in on_request
    self._update_headers(request.http_request.headers, self._token.token)
AttributeError: 'coroutine' object has no attribute 'token'

User Assigned Identities non-functional

When adding a user-assigned identity with the work-pool, it's assigned and added as a property of "resources.properties".

Expectation / Proposal

Checking the arm template, this should be a direct property of "resources".

Traceback / Example

The request content was invalid and could not be deserialized: 'Could not find member 'identity' on object of type 'ContainerGroupPropertiesDefinition'. Path 'properties.identity', line 1, position 815.'."

Logic to keep ACI deployment name within Azure character limit error

Expectation / Proposal

The logic to assure the ACI deployment name is within Azure's character limit specification applies len() to a UUID without first casting to a string. Issue is workers/container_instance.py here.

Traceback / Example

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.10/site-packages/prefect_azure/workers/container_instance.py", line 571, in run
    max_length=55 - len(flow_run.id),
TypeError: object of type 'UUID' has no len()

Azure container instance worker needs image field but it's not required

Expectation / Proposal

If I do not complete the image field in my azure container instance worker I get the below error (in the state message) when I try to run a flow.

There's a pretty simple workaround - If I add an image using the work pool edit page in the UI then the flow runs successfully.

Traceback / Example

Submission failed. Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/azure/core/polling/base_polling.py", line 466, in run self._poll() File "/usr/local/lib/python3.10/site-packages/azure/core/polling/base_polling.py", line 500, in _poll raise OperationFailed("Operation failed or canceled") azure.core.polling.base_polling.OperationFailed: Operation failed or canceled During handling of the above exception, another exception occurred: azure.core.exceptions.HttpResponseError: (DeploymentFailed) At least one resource deployment operation failed. Please list deployment operations for details. Please see https://aka.ms/arm-deployment-operations for usage details. Code: DeploymentFailed Message: At least one resource deployment operation failed. Please list deployment operations for details. Please see https://aka.ms/arm-deployment-operations for usage details. Target: /subscriptions/08de200a-6be9-4a50-8a7d-a19b3af79951/resourceGroups/azure-worker/providers/Microsoft.Resources/deployments/prefect-hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b Exception Details: (InvalidContainerImage) The image cannot be empty for container 'eta7hvevnmuwo' in container group 'hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b'. Code: InvalidContainerImage Message: The image cannot be empty for container 'eta7hvevnmuwo' in container group 'hi_flow-6a002e96-f274-42c4-9825-ba482a7bb15b'.

AzureContainerInstanceJob doesn't support Managed Identity to deploy Container Instance for authenticated Image Registry

Expectation / Proposal

AzureContainerInstanceJob only allows the use of username and password for authentication to container registry. Prefect should allow for using Managed Identities to authenticate to Azure Container Registry (ACR). This would allow for agents/flows running on Azure based compute to leverage Azure AD for authentication. This is especially useful in organizations that want to limit access to admin credentials that are used in ACR.

I propose adding an additional option to block for AzureContainerInstanceJob to allow use of an User Assigned Managed Identity and Azure Container registry exclusively.
Suggestion azure_container_registry which would accept both user name/password or Azure Managed Identity.
We then update _configure_container_group method to handle both class options with image _registry being default for backwards compatibility

References -

  1. Azure Python SDK ImageRegistryCredential Class- https://learn.microsoft.com/en-us/python/api/azure-mgmt-containerinstance/azure.mgmt.containerinstance.models.imageregistrycredential?view=azure-python
  2. Rest API Reference - https://learn.microsoft.com/en-us/rest/api/container-instances/container-groups/create-or-update?tabs=HTTP#imageregistrycredential
  3. ARM Deployment Example - https://learn.microsoft.com/en-us/azure/container-instances/using-azure-container-registry-mi#deploy-using-an-azure-resource-manager-arm-template

Traceback / Example

Deployment step for azure breaks with' DefaultAzureCredential.get_token' was never awaited

Expectation / Proposal

I am using prefect deploy with prefect_azure.deployments.steps.push_to_azure_blob_storage.
For the credentials I dont want to use a connection string but account url as below:
credentials:
account_url: https://xxx.blob.core.windows.net/

When doing so this leads to usage of DefaultAzureCredential() - which will start an authentication flow as per description in
https://github.com/PrefectHQ/prefect-azure/blob/main/prefect_azure/deployments/steps.py

    Push to an Azure Blob Storage container using an account URL and
    default credentials:
    ```yaml
    push:
        - prefect_azure.deployments.steps.push_to_azure_blob_storage:
            requires: prefect-azure[blob_storage]
            container: my-container
            folder: my-folder
            credentials:
                account_url: https://myaccount.blob.core.windows.net/

Expectation is that this will authenticate.
However, in the code of steps.py we find that we import the DefaultAzureCredential from aio (async), but the steps.py, the container client as well as the rest of the code is written in sync not async.

from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob import ContainerClient

This is why we end up in error about "never awaited".

The problem can be very easily fixed by changing

from azure.identity.aio import DefaultAzureCredential

to its sync counterpart:

from azure.identity import DefaultAzureCredential

Traceback / Example

Traceback (most recent call last):
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 124, in run_steps
step_output = await run_step(step, upstream_outputs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 95, in run_step
result = await from_async.call_soon_in_new_thread(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect_azure/deployments/steps.py", line 119, in push_to_azure_blob_storage
client.upload_blob(str(remote_file_path), f, overwrite=True)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_container_client.py", line 1081, in upload_blob
blob.upload_blob(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py", line 737, in upload_blob
return upload_block_blob(**options)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
response = client.upload(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 845, in upload
pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 230, in run
return first_node.send(pipeline_request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
[Previous line repeated 2 more times]
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 197, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/storage/blob/_shared/policies.py", line 520, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 86, in send
response = self.next.send(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 124, in send
self.on_request(request)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 100, in on_request
self._update_headers(request.http_request.headers, self._token.token)
AttributeError: 'coroutine' object has no attribute 'token'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 383, in call
return self.result()
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
File "/opt/conda/envs/prefect.venv/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/deploy.py", line 257, in deploy
await _run_single_deploy(
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/cli/deploy.py", line 519, in _run_single_deploy
await run_steps(push_steps, step_outputs, print_function=app.console.print)
File "/opt/conda/envs/prefect.venv/lib/python3.8/site-packages/prefect/deployments/steps/core.py", line 152, in run_steps
raise StepExecutionError(f"Encountered error while running {fqn}") from exc
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect_azure.deployments.steps.push_to_azure_blob_storage
An exception occurred.
sys:1: RuntimeWarning: coroutine 'DefaultAzureCredential.get_token' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Providing no credentials to `AzureBlobStorageCredentials` raises wrong error message

When I don't provide anything to my AzureBlobStorageCredentials:

e.g.

credentials = AzureBlobStorageCredentials()

I get this error:

โฏ python qa.py
Traceback (most recent call last):
  File "/Users/bean/code-oss/prefect-azure/qa.py", line 6, in <module>
    credentials = AzureBlobStorageCredentials(
  File "/Users/bean/code-oss/prefect-azure/azurevenv/lib/python3.10/site-packages/prefect/blocks/core.py", line 265, in __init__
    super().__init__(*args, **kwargs)
  File "/Users/bean/code-oss/prefect-azure/azurevenv/lib/python3.10/site-packages/pydantic/v1/main.py", line 341, in __init__
    raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for AzureBlobStorageCredentials
__root__
  Must provide either a connection string or account URL, but not both. (type=value_error)

Because the validator is only doing an xor check, not checking that one of the values is True.

@root_validator
def check_connection_string_or_account_url(
    cls, values: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Checks that either a connection string or account URL is provided, not both.
    """
    has_account_url = values.get("account_url") is not None
    has_conn_str = values.get("connection_string") is not None
    if not bool(has_account_url ^ has_conn_str):
        raise ValueError(
            "Must provide either a connection string or account URL, but not both."
        )
    return values

Make the credentials optional for the AzureBlobStorageCredentials Block or for blob interaction in general.

Many users want to configure credentials at the resource level rather than creating a credential and entrusting it to Prefect. For example, we commonly see that Azure KeyVault is used to store secrets and users prefer to only have their secrets stored in one place, so enabling these blocks to function without storing secret values in Prefect is important.

Related to #68.

Make it possible to use blob_storage_download, blob_storage_upload, blob_storage_list and other Azure Storage interactions without storing credentials or secrets in Prefect.

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Worker creates invalid container names

Currently, using the worker to run deployments is broken unless the flow has a short and valid name. This is not a problem when running an old-fashioned agent.

prefect_azure/workers/container_instance.py doesn't do any sort of sanity checking when it comes to creating a container instance name, such as removing spaces or ensuring the name length is within Azure's limits. The invalid logic resides here: https://github.com/PrefectHQ/prefect-azure/blob/8f92ab76cb897eded99eeeed304517959a2caad6/prefect_azure/workers/container_instance.py#L552C46-L552C46

I think it would be better if the naming logic works exactly the same as in prefect_azure/container_instance.py, which lets the user set a name rather than hard-coding it based on the flow name, refrains from adding a prefect-prefix, pads 10 random characters instead of a guid and finally ensures that the name isn't too long for Azure (64 characters).

Authentication issue when using DefaultAzureCredential for blob storage operations

Expectation / Proposal

The prefect-azure docs mention that for authentication for blob storage operations, you can choose to give the connection string, or leave it out and give the account url (in which case DefaultAzureCredential will be used). The latter is nice because it's passwordless.

However, this does not seem to work?

Azure's library throws an exception: TypeError: object AccessToken can't be used in 'await' expression.

Traceback / Example

Copied the official example, and swapped out the connection values:

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

@flow
def example_blob_storage_upload_flow():
    blob_storage_credentials = AzureBlobStorageCredentials(
        account_url="https://xxx.blob.core.windows.net/",
    )

    with open("data.csv", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="etadata",
            blob="data.csv",
            blob_storage_credentials=blob_storage_credentials,
            overwrite=False,
        )
    return blob

example_blob_storage_upload_flow()

->

Traceback
C:\Users\staljaard\Source\onlyazuretest\.venv\Scripts\python.exe C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py 
15:17:15.454 | INFO    | prefect.engine - Created flow run 'straight-wolf' for flow 'example-blob-storage-upload-flow'
15:17:15.637 | INFO    | Flow run 'straight-wolf' - Created task run 'blob_storage_upload-0' for task 'blob_storage_upload'
15:17:15.638 | INFO    | Flow run 'straight-wolf' - Executing 'blob_storage_upload-0' immediately...
15:17:15.727 | INFO    | Task run 'blob_storage_upload-0' - Uploading blob to container etadata with key data.csv
15:17:18.483 | ERROR   | Task run 'blob_storage_upload-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
15:17:18.533 | ERROR   | Task run 'blob_storage_upload-0' - Finished in state Failed("Task run encountered an exception: TypeError: object AccessToken can't be used in 'await' expression\n")
15:17:18.535 | ERROR   | Flow run 'straight-wolf' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 703, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 17, in example_blob_storage_upload_flow
    blob = blob_storage_upload(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\tasks.py", line 494, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1006, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1171, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
15:17:18.691 | ERROR   | Flow run 'straight-wolf' - Finished in state Failed("Flow run encountered an exception. TypeError: object AccessToken can't be used in 'await' expression\n")
Traceback (most recent call last):
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 26, in <module>
    example_blob_storage_upload_flow()
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\flows.py", line 478, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\client\utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 258, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 703, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\onlyazuretest\azure_upload_flow.py", line 17, in example_blob_storage_upload_flow
    blob = blob_storage_upload(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\tasks.py", line 494, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1006, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 138, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1171, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\engine.py", line 1584, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\prefect_azure\blob_storage.py", line 117, in blob_storage_upload
    await blob_client.upload_blob(data, overwrite=overwrite)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 413, in upload_blob
    return await upload_block_blob(**options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\aio\_upload_helpers.py", line 83, in upload_block_blob
    response = await client.upload(
               ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 237, in upload
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 204, in run
    return await first_node.send(pipeline_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_base_async.py", line 70, in send
    response = await self.next.send(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 3 more times]
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 74, in send
    await await_result(self.on_request, request)
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\_tools_async.py", line 37, in await_result
    return await result  # type: ignore
           ^^^^^^^^^^^^
  File "C:\Users\staljaard\Source\onlyazuretest\.venv\Lib\site-packages\azure\core\pipeline\policies\_authentication_async.py", line 52, in on_request
    self._token = await self._credential.get_token(*self._scopes)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object AccessToken can't be used in 'await' expression
This happens irrespective of if you call the task async/sync.

(Actually I would like to but I don't have time at this moment ๐Ÿ˜‘)

Make the AzureContainerInstanceJob creds block optional for AzureContainerInstanceCredentials infra block.

As you can see in the screen shot, a credentials block is required.

image

Many users want to configure credentials at the resource level rather than creating a credential and entrusting it to Prefect. For example, we commonly see that Azure KeyVault is used to store secrets and users prefer to only have their secrets stored in one place, so enabling these blocks to function without storing secret values in Prefect is important.

Create Azure Blob Storage Push Step

Add a project step that allows users to push their project to an Azure Blob Storage container at a given path. Should accept credentials that can be used for authentication with Azure.

Create Azure Storage Pull Step

Add a project step that allows users to pull their project from a Azure Blob Storage container at a given path. Should accept credentials that can be used for authentication with Azure.

AzureStorageBlobContainer not working as PREFECT_DEFAULT_RESULT_STORAGE_BLOCK

Expectation / Proposal

I want to migrate my existing azure/storage to azure-blob-storage-container/storage and use it with PREFECT_DEFAULT_RESULT_STORAGE_BLOCK

Traceback / Example

I have configured a AzureBlobStorageContainer block using:

        from os import environ
        from prefect_azure import AzureBlobStorageCredentials
        from prefect_azure.blob_storage import AzureBlobStorageContainer

        credentials = AzureBlobStorageCredentials(
          account_url=f"https://{environ['STORAGE_ACCOUNT']}.blob.core.windows.net",
        )
        credentials.save(name="result-storage-credentials", overwrite=True)

        block = AzureBlobStorageContainer(
          container_name="results",
          credentials=credentials,
        )
        block.save(name="result-storage", overwrite=True)

In my kubernetes base jobs template, I have changed my setting for PREFECT_DEFAULT_RESULT_STORAGE_BLOCK
from azure/result-storage to azure-blob-storage-container/result-storage

However, when I run a flow I get the following error:

Crash detected! Execution was interrupted by an unexpected exception: KeyError: "No class found for dispatch key 'azure-blob-storage-container' in registry for type 'Block'."

My prefect version is 2.16.4.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.