Koby Kilimnik
06/18/2022, 8:31 AMSon Giang
06/18/2022, 11:21 AMygt1qa
06/19/2022, 2:46 AM@op(out=Out(io_manager_key="obj_io_manager"}))
def op1(context):
# 1. I define some dynamic values in ops
some_value = "aaa"
return something...
class ObjIOManager(IOManager):
def handle_output(self, context, obj):
# 2. I want to get and use op1's some_value
print(some_value)
def load_input(self, context):
# 2. I want to get and use op1's some_value
print(some_value)
Barry Sun
06/20/2022, 12:13 AMdagster.yaml
) file to define which daemons to run, which is located in the same folder as workspace.yaml
. I am trying to run a Backfill, for this which daemon do I need, and how do I define it in my dagster instance file?Barry Sun
06/20/2022, 5:21 AMload_assets_from_modules
is not working for me - I want to be able to load all the assets from a module (e.g. raw_assets) and apply the 'group_name'. I seem to be able to run this function, but how do I input it into with_resources
? I get the error AttributeError: 'list' object has no attribute 'with_resources'
.
2. Be able to build job from assets. I've tried using define_asset_job
but I just can't find clear documentation on the parameters inside, and how it fits into the final repository definition. Looking at the API docs, I'm not sure how to input my multi-key assets.
I'm not sure if it's me but I'm having a hard time finding enough documentation around the new functions 😕Oliver
06/20/2022, 6:25 AMDmitry Mogilevsky
06/20/2022, 7:50 AMwith_resources
to create multiple assets from the same graph with varying configs, using the code below
@op(required_resource_keys={"a"})
def op1(context):
return context.resources.a["v"] + 1
@op
def op2(context, op1):
return op1 + 1
@graph
def mygraph():
return op2(op1())
@job(resource_defs={"a": make_values_resource(v=int)})
def myjob():
mygraph()
VALUE_OPTIONS = [0, 1, 2]
myassets = [
with_resources(
[AssetsDefinition.from_graph(mygraph)],
resource_defs={"a": make_values_resource(v=int)},
resource_config_by_key={"a": {"config": {"v": v}}},
)[0]
for v in VALUE_OPTIONS
]
@repository
def test_repo():
return [*myassets, myjob]
However, I get the following error when I attempt to load it
dagster.core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'a' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
Is it not possible to specify multiple assets from a single graph that vary only in input config?
I realise this a classic case for Static Partitioning, but I'm trying to do this without using Static Partitioning to get around the fact that I can't partition across multiple directions, as mentioned in a previous thread.Mykola Palamarchuk
06/20/2022, 10:46 AM@graph
def get_and_persist_page(page_idx):
page_data = get_page(page_idx) # from external API
persist_original(page_idx, page_data) # persist to track history
return to_dataframe(page_idx, page_data) # convert to Pandas DataFrame
@graph
def my_job_graph:
page_indexes = get_page_indexes() # DynamicOutput, get list of pages from external API
list_of_dfs = pages.map(get_and_persist_page).collect()
combined_df = combine(list_of_dfs)
save_to_warehouse(combined_df)
It works this way. I like that it is possible to configure retry-policy for "get_page" op. But I'm curious if there is another elegant way to do this?
What bothers me:
• I don't know how to define type for the list of Pandas DataFrames produced by collect()
• It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?Chris Histe
06/20/2022, 3:42 PMException: No docker image specified by the instance config or repository
when dagster job launch -j job -c config.yaml
config.yaml
execution:
config:
image: <http://ghcr.io/user/image:latest|ghcr.io/user/image:latest>
registry:
url: <https://ghcr.io>
username: user
password: password
dagster.yaml
run_launcher:
module: dagster_docker
class: DockerRunLauncher
There is very little documentation. And I’m struggling to fix this. What am I doing wrong?
ThanksAlessandro Facchin
06/20/2022, 6:02 PMCharles Leung
06/20/2022, 7:27 PMMark Atkins
06/20/2022, 10:42 PMsnowflake-connector-python
package, I'm not currently using the dagster-snowflake implementation because I'd like to be able to use the web based browser authentication for this resource on a local machine.
Setting up the resource is no trouble, but something I'm noticing is that If I have multiple ops that consume this resource, the browser auth is triggered for every distinct op that utilizes the resource. Reading through the documentation it conceptually makes sense that this would happen, basically executing the code within a resource definition at every op. But is there any way to instantiate a connection object/cursor once at the beginning of a job within a resource, so that each time the resource is utilized in an op it doesn't attempt to recreate the object?Jeremy
06/20/2022, 11:30 PMJeremy
06/21/2022, 2:41 AMJeremy
06/21/2022, 4:04 AM@op(
out={
"ip_metadata": Out(
asset_key=AssetKey(["schema","table"]),
asset_partitions=context.partition_key,
metadata={"partition_expr": "created_on"},
io_manager_key="snowflake_io_manager",
)
}
)
how do i set the asset_partitions from the context for this op?Sathish
06/21/2022, 4:18 AMSon Giang
06/21/2022, 7:56 AMAssetsDefinition.from_graph()
?Son Giang
06/21/2022, 8:54 AMLucia Ambrogi
06/21/2022, 10:12 AMOutput
object to attach metadata to my op’s output (based on this example). How can I access this metadata from a downstream op?MO
06/21/2022, 10:33 AMDavid Hyman
06/21/2022, 11:21 AMRszk
06/21/2022, 11:33 AMMegan Beckett
06/21/2022, 1:25 PM@graph
def update_db_metadata():
"""
Generalised job that will pull all metadata required. There is a test and prod instance of each below.
"""
process_data(pull_data(connect_api()))
# Test job - will use local testing database connection
update_db_metadata_test_job = update_db_metadata.to_job(
name='update_db_metadata_test_job',
resource_defs={"database": database_connection},
config=config_from_files(
[file_relative_path(__file__, "../config/run_config_test.yaml")]
),
)
# Production job - will use production database connection
update_db_metadata_prod_job = update_db_metadata.to_job(
name='update_db_metadata_prod_job',
resource_defs={"database": database_connection},
config=config_from_files(
[file_relative_path(__file__, "../config/run_config_prod.yaml")]
),
)
In the Dagit UI, I can then run either the testing or the production job (this isn't deployed yet either).
Is this the recommended way to handle connecting to different DB environments in testing vs production? Or how else?Jeremy
06/21/2022, 1:45 PMMykola Palamarchuk
06/21/2022, 2:32 PMRetryRequested
? And how does it work internally?
My case: a HTTP request may fail sometimes with different errors that require individual retry policy. E.g. some response means we have to wait 30 seconds and can be retry only once (NotReadyYetException), but another response requires exponential timeout and can be retried more times (TemporaryServerOutageException). Is it possible to model that somehow?Jay Jackson
06/21/2022, 5:59 PMdagster.core.errors.DagsterInvalidDefinitionError: Found invalid cron schedule '* * 0 * *' for schedule
? Is * * 0 * *
not 5 field cron expression ?Zach
06/21/2022, 6:40 PM{
"error": {
"data": null,
"errors": [
{
"message": "Variable \"runId\" of type \"ID\" used in position expecting type \"ID!\".",
"locations": [
{
"line": 1,
"column": 18
},
{
"line": 2,
"column": 21
}
]
}
]
}
}
and my query:
query LogsForRun($runId: ID){
logsForRun (runId: $runId) {
__typename
... on EventConnection {
events {
__typename
... on ExecutionStepStartEvent {
runId
timestamp
stepKey
}
}
}
}
}
with this variable:
{
"runId": "1d71f590-e925-42eb-af6c-831d524e1945"
}
Harpal
06/21/2022, 7:03 PMdagster._check.CheckError: Invariant failed. Description: Invalid asset dependencies: {AssetKey(['sector_cls__hold__train_set_csv_rig_refactor'])} specified in `internal_asset_deps` argument for multi-asset '_assets' on key 'sector_cls__hold__train_set_csv_rig_refactor'. Each specified asset key must be associated with an input to the asset or produced by this asset. Valid keys: {AssetKey(['public', 'sector_cls__hold__eval_set_csv']), AssetKey(['public', 'sector_cls__hold__isolate_all_caf']), AssetKey(['public', 'sector_cls__hold__isolate_test_caf']), AssetKey(['public', 'sector_cls__hold__isolate_test_haw']), AssetKey(['public', 'sector_cls__hold__train_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__not_test_wak']), AssetKey(['public', 'sector_cls__hold__train_gcal']), AssetKey(['public', 'sector_cls__hold__isolate_test_wak']), AssetKey(['public', 'sector_cls__hold__train_caf']), AssetKey(['public', 'sector_cls__hold__eval_set_ids']), AssetKey(['public', 'sector_cls__hold__test_set_csv']), AssetKey(['public', 'sector_cls__hold__not_test_caf']), AssetKey(['public', 'sector_cls_crunchbase_data']), AssetKey(['public', 'sector_cls__hold__not_test_set_ids']), AssetKey(['gcs', 'sector_cls__hold__train_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__train_unk']), AssetKey(['public', 'sector_cls__hold__isolate_all_unk']), AssetKey(['public', 'sector_cls__hold__eval_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__not_test_unk']), AssetKey(['public', 'sector_cls__hold__test_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__train_wak']), AssetKey(['public', 'sector_cls__hold__isolate_all_haw']), AssetKey(['public', 'sector_cls__hold__isolate_all_wak']), AssetKey(['public', 'sector_cls__hold__isolate_test_set_ids']), AssetKey(['public', 'sector_cls__hold__isolate_test_gcal']), AssetKey(['public', 'sector_cls__hold__train_set_ids']), AssetKey(['public', 'sector_cls__hold__isolate_test_unk']), AssetKey(['public', 'sector_cls__hold__not_test_gcal']), AssetKey(['public', 'sector_cls__hold__train_haw']), AssetKey(['public', 'sector_cls__hold__train_set_csv']), AssetKey(['public', 'sector_cls__hold__not_test_haw']), AssetKey(['gcs', 'sector_cls__hold__test_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__isolate_all_gcal']), AssetKey(['gcs', 'sector_cls__hold__eval_set_csv_rig_refactor'])}
Stream closed EOF for dagstertest/dagster-dagster-user-deployments-moonfire-dagster-repo-6c5p8z8k (dagster-user-deployments)
Any help would be much appreciated 😛artydagster:
Code snippets available in the comments!Eric Larson
06/21/2022, 7:33 PMdagster.core.errors.DagsterInvalidDefinitionError: "meta" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.
dbt_assets = load_assets_from_dbt_project(
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 364, in load_assets_from_dbt_project
return load_assets_from_dbt_manifest(
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 433, in load_assets_from_dbt_manifest
dbt_assets_def = _dbt_nodes_to_assets(
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 272, in _dbt_nodes_to_assets
return AssetsDefinition(
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/asset_defs/assets.py", line 85, in __init__
self._group_names_by_key[key] = validate_group_name(group_name)
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/definitions/utils.py", line 122, in validate_group_name
return check_valid_name(group_name)
File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/definitions/utils.py", line 56, in check_valid_name
raise DagsterInvalidDefinitionError(
Jay Jackson
06/21/2022, 7:55 PMdagster-daemon
and I'm not seeing it in the dagit UI ..
dagster-daemon run -w dagster-workspace.yaml
@schedule(
cron_schedule="* * * * *",
pipeline_name="my_job",
job=my_job,
)
def my_job_schedule(context):
# Find runs of the same job that are currently running
run_records = context.instance.get_run_records(
RunsFilter(job_name="my_job",
statuses=[DagsterRunStatus.STARTED]))
# Kick off a run only if no other runs of the same job are running
if len(run_records) == 0:
yield RunRequest()
Jay Jackson
06/21/2022, 7:55 PMdagster-daemon
and I'm not seeing it in the dagit UI ..
dagster-daemon run -w dagster-workspace.yaml
@schedule(
cron_schedule="* * * * *",
pipeline_name="my_job",
job=my_job,
)
def my_job_schedule(context):
# Find runs of the same job that are currently running
run_records = context.instance.get_run_records(
RunsFilter(job_name="my_job",
statuses=[DagsterRunStatus.STARTED]))
# Kick off a run only if no other runs of the same job are running
if len(run_records) == 0:
yield RunRequest()
dagster schedule start
, that cmd is giving me There are no schedules defined for repository
alsorex
06/21/2022, 8:01 PM@repository
that you specified in your workspace.yaml. Could you show your repository definition?Jay Jackson
06/21/2022, 8:02 PMdagster-workspace.yaml
->
load_from:
- python_file:
relative_path: data/workflow-recommendations.py
workflow-recommendations.py
, I only have the @job decorator , do i need the @repository as well?rex
06/21/2022, 8:12 PM