Coder Social home page Coder Social logo

Comments (12)

alangenfeld avatar alangenfeld commented on May 31, 2024

Does this reproduce for you on more recent versions as well?

from dagster.

paulBurnsUpside avatar paulBurnsUpside commented on May 31, 2024

Yes it does, I have tested through Dagster 1.6.0

from dagster.

alangenfeld avatar alangenfeld commented on May 31, 2024

I have been unable to reproduce with AssetMaterialization events that were written in 1.5.5 1.5.6 or master. These all correctly return information for get_latest_materialization_code_versions in 1.5.5, 1.5.6 and master.

I have looked through the changes between 1.5.5 and 1.5.6 and haven't been able to find any leads.

Is there any other context about the asset materializations you are seeing this on?

from dagster.

paulBurnsUpside avatar paulBurnsUpside commented on May 31, 2024

Sorry I should have stated this upfront, the Assets we mainly use are dbt assets created from @dbt_assets.

from dagster.

alangenfeld avatar alangenfeld commented on May 31, 2024

Can you share what your @dbt_assets use roughly looks like in case that information is relevant?

from dagster.

paulBurnsUpside avatar paulBurnsUpside commented on May 31, 2024

Yeah I don't mind sharing that.

from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator, DagsterDbtTranslatorSettings
from dagster import file_relative_path, Output, MetadataValue, Config, AssetExecutionContext, AssetKey, Failure
from dagster_slack import SlackResource
from dagster_dbt.utils import output_name_fn
from dateutil import parser
import os
from default_code_location.resources import dbt_manifest_path
from analytics_platform_dagster_utils.dbt.get_manifest import get_manifest
from typing import Any, Mapping, Optional, Dict
import re

# Helper to ensure the asset group follows Dagster naming requirements
def remove_non_alphanumeric(string):
    return re.sub(r'[^A-Za-z0-9_]+', '', string)

# Class for defining the dbt execution config - values below are the defaults but can be altered for specific jobs or from the UI launchpad
class MyDbtConfig(Config):
    run_build_or_test: str = "build"
    full_refresh: bool = False
    fail_fast: bool = False

# Class for defining the dbt translator config, customizing the metadata and asset groups
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        metadata: Dict[str, Any] = {}
        metadata["dbt_metadata"] = dbt_resource_props
        metadata["materialized"] = MetadataValue.text(dbt_resource_props.get("config", {}).get("materialized", 'n/a'))
        metadata["file_path"] = MetadataValue.path(dbt_resource_props.get("original_file_path", 'n/a'))
        metadata["contract_enforced"] = MetadataValue.bool(dbt_resource_props.get("config", {}).get("contract", {}).get("enforced", False))        
        return {
            **super().get_metadata(dbt_resource_props),
            **metadata,
        }
    def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        return remove_non_alphanumeric(os.getenv('SUBSYSTEM_NAME').replace('-', '_') + '_' + dbt_resource_props.get("schema", 'default'))
    @classmethod
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        asset_key = super().get_asset_key(dbt_resource_props)
        if dbt_resource_props["unrendered_config"].get('meta', {}).get('dagster', {}).get('asset_key'):
            pass
        elif dbt_resource_props.get('loader',{})=="fivetran":
            pass
        elif dbt_resource_props["resource_type"] != "source" or dbt_resource_props["package_name"] == 'dbt_snowflake_monitoring':
            # if the asset is not a source, add the subsystem name as a prefix
            asset_key = asset_key.with_prefix(os.getenv('SUBSYSTEM_NAME').replace('-', '_'))
        else:
            # if the asset is a source and is not, add the source subsystem as a prefix
            source_system = dbt_resource_props["original_file_path"].split('/')[2]
            if source_system == 'schema.yml':
                source_system=dbt_resource_props['source_name'].replace('src_', '').replace('-', '_')
            else:
                source_system = source_system.replace('contracts_', '').replace('src_', '').replace('-', '_')

            asset_key = asset_key.with_prefix(source_system)
        return asset_key
# Set up project Definitions
dbt_project_path = file_relative_path(__file__, "../../dbt_ae")
manifest=get_manifest(dbt_project_path,dbt_manifest_path)


#Load definitions into dbt assets for dagster
@dbt_assets(
    manifest=manifest,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(
    context: AssetExecutionContext, dbt: DbtCliResource, slack: SlackResource, config: MyDbtConfig
):
    dbt_build_args = [config.run_build_or_test]
    if config.full_refresh:
        dbt_build_args += ["--full-refresh"]    
    if config.full_refresh:
        dbt_build_args += ["--fail-fast"]      
    dbt_cli_task = dbt.cli(dbt_build_args, context=context, raise_on_error=False)

    events = list(dbt_cli_task.stream_raw_events())

    # Then, we can use the run results to add metadata to the outputs.
    for event in events:
        for dagster_event in event.to_default_asset_events(manifest=manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()):
            if dbt_cli_task.is_successful:
                # Get the run results after the task has completed.
                run_results = dbt_cli_task.get_artifact("run_results.json")
                executed_manifest = dbt_cli_task.get_artifact("manifest.json")
                results_by_output_name = {
                    output_name_fn({"unique_id": result["unique_id"]}): result
                    for result in run_results["results"]
                }
                manifest_by_output_name = {
                    output_name_fn({"unique_id": unique_id}): node
                    for unique_id, node in executed_manifest["nodes"].items()
                }

                if isinstance(dagster_event, Output):
                    event_node_info = event.raw_event["data"]["node_info"]
                    started_at = parser.isoparse(event_node_info["node_started_at"])
                    completed_at = parser.isoparse(event_node_info["node_finished_at"])

                    output_name = dagster_event.output_name
                    result = results_by_output_name[output_name]
                    rows_affected: Optional[int] = result["adapter_response"].get(
                        "rows_affected", 0
                    )
                    rows_affected_metadata = (
                        {"rows_affected": rows_affected} if rows_affected else {"rows_affected": 0}
                    )
                    query_id: Optional[string] = result["adapter_response"].get(
                        "query_id"
                    )
                    query_id_metadata = (
                        {"query_id": query_id,"link_to_query": "https://mk43712.us-east-1.snowflakecomputing.com/console#/monitoring/queries/detail?queryId="+query_id} if query_id else {}
                    )
                    node = manifest_by_output_name[output_name]
                    compiled_sql: Optional[str] = node.get("compiled_code")
                    compiled_sql_metadata = (
                        {"compiled_sql": MetadataValue.md(compiled_sql)}
                        if compiled_sql
                        else {}
                    )

                    node_metadata = {
                        "Execution Started At": started_at.isoformat(
                            timespec="seconds"
                        ),
                        "Execution Completed At": completed_at.isoformat(
                            timespec="seconds"
                        ),
                        "Execution Duration": (
                            completed_at - started_at
                        ).total_seconds(),
                    }

                    context.add_output_metadata(
                        metadata={
                            **rows_affected_metadata,
                            **compiled_sql_metadata,
                            **node_metadata,
                            **query_id_metadata
                        },
                        output_name=output_name,
                    )               
            yield dagster_event

    if not dbt_cli_task.is_successful():
        raise Failure(description="At least one dbt model or test has failed.")

from dagster.

alangenfeld avatar alangenfeld commented on May 31, 2024

I see both locally and in our production deployment which runs using ECS.

Is your local deployment a long lived one? Curious what dagster instance info looks like for both. Theory here is that you are in some old DB schema state that we don't force migration from. dagster instance migrate is the command that upgrades the DB schema.

from dagster.

skwon615 avatar skwon615 commented on May 31, 2024

Hi @alangenfeld - I'm on the same team as Paul. The local deployment was just using dagster dev so it isn't a long lived one. Also, we've run dagster instance migrate ~3 months ago - is that too far back?

from dagster.

alangenfeld avatar alangenfeld commented on May 31, 2024

At this point I am just speculating on possible causes as all of my attempts to reproduce have been unsuccessful.

Are all of your dagster packages on the same version?

from dagster.

skwon615 avatar skwon615 commented on May 31, 2024

Ah gotcha, thanks for the context :) And yes, they are all on the same version!

from dagster.

skwon615 avatar skwon615 commented on May 31, 2024

Hi @alangenfeld / team - just following up here! Let us know if you need any other context from us

from dagster.

alangenfeld avatar alangenfeld commented on May 31, 2024

Without the ability to reproduce I am not sure how to efficiently find the root problem on our end. Some ideas on how you could dig in on your end:

  • step through using a debugger (or add print statements) for instance.get_latest_materialization_code_versions(asset_keys) in the two versions and identify precisely where things diverge
  • check out dagster locally, pip install -e python_modules/dagster and use git bisect to identify the exact commit that breaks things. You should be able to write a script using instance.get_latest_materialization_code_versions(asset_keys) and use git bisect run https://git-scm.com/docs/git-bisect#_bisect_run

from dagster.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.