Comments (12)
Does this reproduce for you on more recent versions as well?
from dagster.
Yes it does, I have tested through Dagster 1.6.0
from dagster.
I have been unable to reproduce with AssetMaterialization
events that were written in 1.5.5
1.5.6
or master
. These all correctly return information for get_latest_materialization_code_versions
in 1.5.5
, 1.5.6
and master
.
I have looked through the changes between 1.5.5
and 1.5.6
and haven't been able to find any leads.
Is there any other context about the asset materializations you are seeing this on?
from dagster.
Sorry I should have stated this upfront, the Assets we mainly use are dbt assets created from @dbt_assets
.
from dagster.
Can you share what your @dbt_assets
use roughly looks like in case that information is relevant?
from dagster.
Yeah I don't mind sharing that.
from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator, DagsterDbtTranslatorSettings
from dagster import file_relative_path, Output, MetadataValue, Config, AssetExecutionContext, AssetKey, Failure
from dagster_slack import SlackResource
from dagster_dbt.utils import output_name_fn
from dateutil import parser
import os
from default_code_location.resources import dbt_manifest_path
from analytics_platform_dagster_utils.dbt.get_manifest import get_manifest
from typing import Any, Mapping, Optional, Dict
import re
# Helper to ensure the asset group follows Dagster naming requirements
def remove_non_alphanumeric(string):
return re.sub(r'[^A-Za-z0-9_]+', '', string)
# Class for defining the dbt execution config - values below are the defaults but can be altered for specific jobs or from the UI launchpad
class MyDbtConfig(Config):
run_build_or_test: str = "build"
full_refresh: bool = False
fail_fast: bool = False
# Class for defining the dbt translator config, customizing the metadata and asset groups
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
metadata: Dict[str, Any] = {}
metadata["dbt_metadata"] = dbt_resource_props
metadata["materialized"] = MetadataValue.text(dbt_resource_props.get("config", {}).get("materialized", 'n/a'))
metadata["file_path"] = MetadataValue.path(dbt_resource_props.get("original_file_path", 'n/a'))
metadata["contract_enforced"] = MetadataValue.bool(dbt_resource_props.get("config", {}).get("contract", {}).get("enforced", False))
return {
**super().get_metadata(dbt_resource_props),
**metadata,
}
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
return remove_non_alphanumeric(os.getenv('SUBSYSTEM_NAME').replace('-', '_') + '_' + dbt_resource_props.get("schema", 'default'))
@classmethod
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
if dbt_resource_props["unrendered_config"].get('meta', {}).get('dagster', {}).get('asset_key'):
pass
elif dbt_resource_props.get('loader',{})=="fivetran":
pass
elif dbt_resource_props["resource_type"] != "source" or dbt_resource_props["package_name"] == 'dbt_snowflake_monitoring':
# if the asset is not a source, add the subsystem name as a prefix
asset_key = asset_key.with_prefix(os.getenv('SUBSYSTEM_NAME').replace('-', '_'))
else:
# if the asset is a source and is not, add the source subsystem as a prefix
source_system = dbt_resource_props["original_file_path"].split('/')[2]
if source_system == 'schema.yml':
source_system=dbt_resource_props['source_name'].replace('src_', '').replace('-', '_')
else:
source_system = source_system.replace('contracts_', '').replace('src_', '').replace('-', '_')
asset_key = asset_key.with_prefix(source_system)
return asset_key
# Set up project Definitions
dbt_project_path = file_relative_path(__file__, "../../dbt_ae")
manifest=get_manifest(dbt_project_path,dbt_manifest_path)
#Load definitions into dbt assets for dagster
@dbt_assets(
manifest=manifest,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(
context: AssetExecutionContext, dbt: DbtCliResource, slack: SlackResource, config: MyDbtConfig
):
dbt_build_args = [config.run_build_or_test]
if config.full_refresh:
dbt_build_args += ["--full-refresh"]
if config.full_refresh:
dbt_build_args += ["--fail-fast"]
dbt_cli_task = dbt.cli(dbt_build_args, context=context, raise_on_error=False)
events = list(dbt_cli_task.stream_raw_events())
# Then, we can use the run results to add metadata to the outputs.
for event in events:
for dagster_event in event.to_default_asset_events(manifest=manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()):
if dbt_cli_task.is_successful:
# Get the run results after the task has completed.
run_results = dbt_cli_task.get_artifact("run_results.json")
executed_manifest = dbt_cli_task.get_artifact("manifest.json")
results_by_output_name = {
output_name_fn({"unique_id": result["unique_id"]}): result
for result in run_results["results"]
}
manifest_by_output_name = {
output_name_fn({"unique_id": unique_id}): node
for unique_id, node in executed_manifest["nodes"].items()
}
if isinstance(dagster_event, Output):
event_node_info = event.raw_event["data"]["node_info"]
started_at = parser.isoparse(event_node_info["node_started_at"])
completed_at = parser.isoparse(event_node_info["node_finished_at"])
output_name = dagster_event.output_name
result = results_by_output_name[output_name]
rows_affected: Optional[int] = result["adapter_response"].get(
"rows_affected", 0
)
rows_affected_metadata = (
{"rows_affected": rows_affected} if rows_affected else {"rows_affected": 0}
)
query_id: Optional[string] = result["adapter_response"].get(
"query_id"
)
query_id_metadata = (
{"query_id": query_id,"link_to_query": "https://mk43712.us-east-1.snowflakecomputing.com/console#/monitoring/queries/detail?queryId="+query_id} if query_id else {}
)
node = manifest_by_output_name[output_name]
compiled_sql: Optional[str] = node.get("compiled_code")
compiled_sql_metadata = (
{"compiled_sql": MetadataValue.md(compiled_sql)}
if compiled_sql
else {}
)
node_metadata = {
"Execution Started At": started_at.isoformat(
timespec="seconds"
),
"Execution Completed At": completed_at.isoformat(
timespec="seconds"
),
"Execution Duration": (
completed_at - started_at
).total_seconds(),
}
context.add_output_metadata(
metadata={
**rows_affected_metadata,
**compiled_sql_metadata,
**node_metadata,
**query_id_metadata
},
output_name=output_name,
)
yield dagster_event
if not dbt_cli_task.is_successful():
raise Failure(description="At least one dbt model or test has failed.")
from dagster.
I see both locally and in our production deployment which runs using ECS.
Is your local deployment a long lived one? Curious what dagster instance info
looks like for both. Theory here is that you are in some old DB schema state that we don't force migration from. dagster instance migrate
is the command that upgrades the DB schema.
from dagster.
Hi @alangenfeld - I'm on the same team as Paul. The local deployment was just using dagster dev
so it isn't a long lived one. Also, we've run dagster instance migrate
~3 months ago - is that too far back?
from dagster.
At this point I am just speculating on possible causes as all of my attempts to reproduce have been unsuccessful.
Are all of your dagster
packages on the same version?
from dagster.
Ah gotcha, thanks for the context :) And yes, they are all on the same version!
from dagster.
Hi @alangenfeld / team - just following up here! Let us know if you need any other context from us
from dagster.
Without the ability to reproduce I am not sure how to efficiently find the root problem on our end. Some ideas on how you could dig in on your end:
- step through using a debugger (or add
print
statements) forinstance.get_latest_materialization_code_versions(asset_keys)
in the two versions and identify precisely where things diverge - check out
dagster
locally,pip install -e python_modules/dagster
and usegit bisect
to identify the exact commit that breaks things. You should be able to write a script usinginstance.get_latest_materialization_code_versions(asset_keys)
and usegit bisect run
https://git-scm.com/docs/git-bisect#_bisect_run
from dagster.
Related Issues (20)
- Hooks: slack_on_success and slack_on_failure fail silently to work
- [dagster-deltalake] GcsConfig ImportError and TypeError for partitioned assets
- Different UX when viewing runs locally than in production because of additional tags
- `load_asset_checks_from_module` sometimes returns `AssetsDefinition`s instead of `AssetChecksDefinition`s
- ModuleNotFoundError: No module named 'dbt.adapters.base.impl' HOT 4
- Support tags with colon (:) HOT 3
- Propagate filters when navigating through catalog search results
- dagster_pipes.DagsterPipesError: Cannot send message after pipes context is closed." HOT 2
- Cannot create asset job with BackfillPolicy.multi_run() and backfill_policy=None
- GCS IO manager connection error "AttributeError: 'NoneType' object has no attribute 'get_client'" HOT 3
- Dagster-pipes report_asset_materialization is missing `partition` versioning
- Daemon error on code location reload HOT 3
- Dagster-pipes errors out even though run was successfull HOT 1
- Allow easily backfilling all failed partitions for different assets with different partitions HOT 2
- @io_manager(config_schema=...) fails to resolve silently after pydantic update >=2.7.1
- Provide more flexible launch-time configuration composition options for Kubernetes
- Allow multiple partition selection in launch pad
- Launchpad on asset shows all resources instead of the required ones HOT 2
- Missing env var causes silent failure of materialization from UI HOT 1
- Include .gitignore in Dagster project scaffolding
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dagster.