Abhinav Ayalur
12/05/2022, 6:50 PMAbhinav Ayalur
12/05/2022, 7:18 PMHuy Dao
12/05/2022, 7:27 PM@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=status_reporting_job,
)
def report_status_sensor(context):
# this condition prevents the sensor from triggering status_reporting_job again after it succeeds
if context.dagster_run.job_name != status_reporting_job.name:
run_config = {
"ops": {
"status_report": {"config": {"job_name": context.dagster_run.job_name}}
}
}
return RunRequest(run_key=None, run_config=run_config)
else:
return SkipReason("Don't report status of status_reporting_job")
is there anyway I can get the run config of the job from context.dagster_run? I want to check the previous run_config before I trigger another job. Many thanksByron Murillo
12/05/2022, 8:59 PMYang
12/05/2022, 9:22 PMcontext = build_op_context(
config={"exec_path": ""},
partition_key=MultiPartitionKey({"fiscal_year": "2021", "dataset": "idealratings"}))
Alex Prykhodko
12/06/2022, 12:21 AMs3_pickle_io_manager
. Works as expected when using fs_io_manager
(the input upstream argument is a dict with keys as partitions).
Code:
@asset(partitions_def=StaticPartitionsDefinition(get_partitions()))
def sa_metrics_normalized(context: OpExecutionContext, sa_metrics_raw):
...
@asset
def sa_metrics_data_frame(context: OpExecutionContext, sa_metrics_normalized):
...
Error:
dagster._check.CheckError: Failure condition: Tried to access partition key for input 'sa_metrics_normalized' of step 'sa_metrics_data_frame', but the step input has a partition range: '2014-01' to '2016-12'.
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/inputs.py", line 856, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 72, in load_input
key = self._get_path(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 33, in _get_path
path = context.get_asset_identifier()
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 409, in get_asset_identifier
return [*self.asset_key.path, self.asset_partition_key]
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 324, in asset_partition_key
return self.step_context.asset_partition_key_for_input(self.name)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/system.py", line 915, in asset_partition_key_for_input
check.failed(
File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1642, in failed
raise CheckError(f"Failure condition: {desc}")
Abhinav Ayalur
12/06/2022, 12:56 AMMycchaka Kleinbort
12/06/2022, 12:33 PMmostActiveUserId
that run on a daily schedule (pulls the latest user activity data from snowflake, returns the userId of the most active user - call this ). Most days the most active user is the same as yesterday. In this specific scenario, I only want to mark the downstream tasks as stale if the mostActiveUserId
asset value has changed.Tamas Juhasz
12/06/2022, 1:05 PMExecution of run for "" failed. Execution was interrupted unexpectedly. No user initiated termination request was found, treating as failure.
I've no clue where does this termination comes from.Damian
12/06/2022, 1:08 PMCasper Weiss Bang
12/06/2022, 1:36 PMdocker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/cb639ffa1ed57b0cd6581e81de37bb4395bfb4cb7f398d03b2b15a9eabbcea37/start>: Not Found ("network dagster_network not found")
I can see docker-compose gives it another name, username_dagster_network
- i.e the name of my current user and the network.. You might want to document that somewhere. or am i doing something wrong?Mehdi Hasanvandy
12/06/2022, 2:54 PMMehdi Hasanvandy
12/06/2022, 2:59 PMKirk Stennett
12/06/2022, 4:14 PMDerek Truong
12/06/2022, 5:33 PMZachary Bluhm
12/06/2022, 7:54 PMdagsterApiGrpcArgs
?nickvazz
12/06/2022, 9:00 PMassets
?
import glob
import os
from dagster import asset, DynamicPartitionsDefinition
def get_partitions(_):
return map(os.path.basename, glob.glob("/some/path/*"))
@asset(
group_name='test_group',
# partitions_def=DynamicPartitionsDefinition(get_partitions), # this line makes it fail
)
def partitioned_asset(context):
<http://context.log.info|context.log.info>()
Binh Pham
12/06/2022, 9:33 PMalias
config?
dbt model with the file name my_schema___my__table.sql
{{ config(alias="my_table") }}}
SELECT 1
the asset key will be my_schema / my_schema___my__table
downstream asset using dagster-snowflake io manager
@asset(
in={"my_schema__mytable": AssetIn(key_prefix=["my_schema"])
...
)
def my_asset(my_schema__my_table: DataFrame):
...
the io manager would do a select * from <http://my_schema.my|my_schema.my>__schema____my__table
, which doesnt existChris Anderson
12/07/2022, 12:16 AM.collect()
and a dynamic op that's been turned into an asset from within another asset that has it as a dependency? For example, I have
@op(out=DynamicOut())
def observations(context) -> DynamicOutput[gpd.GeoDataFrame]:
# yield observations in chunks
This op gets turned into an asset with its own group later with AssetsDefinition.from_op
that's used across many jobs. In some jobs this dynamic behavior is beneficial and sought after, but in others i'd like to collect all the information before proceeding, like below
@asset(
ins={'observations': AssetIn('observations')},
outs={'collective_analysis': AssetOut()}
)
def collective_analysis(context, observations):
collected_obs = observations.collect()
# do analysis with all observations collected into one dataframe
Doing this code right now throws the following error and tries to split this collective_analysis
asset materialization dynamically according to the the original dynamic op:
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "collective_analysis"::AttributeError: 'GeoDataFrame' object has no attribute 'collect'
Gatsby Lee
12/07/2022, 3:46 AMdagster._core.errors.DagsterInvalidDefinitionError: "log__<dagster._core.definitions.composition.InvokedSolidOutputHandle object at 0x1088f1f90>" is not a valid name in Dagster. Names must be in regex ^[A-Za-z0-9_]+$.
Here is the sample code.
@op(out={"step_name": Out()})
def op__get_config():
step_name = "hello-dagster"
return step_name
def generate_op(step_name: str):
@op(name=f"log__{step_name}")
def func():
print(f"hello-{step_name}")
return func
@job
def job() -> None:
step_name = op__get_config()
log_op = generate_op(step_name)
Gatsby Lee
12/07/2022, 3:47 AMGatsby Lee
12/07/2022, 4:45 AMWilliam
12/07/2022, 7:10 AMshift+click
on asset graph pageWilliam
12/07/2022, 7:25 AMget_partitions
method?Sơn Lê
12/07/2022, 7:54 AMdagit
, ImportError: cannot import name 'introspection_query' from 'graphql' (/workspace/.pyenv_mirror/user/current/lib/python3.8/site-packages/graphql/__init__.py).
However, according to tutorial, dagster should run properly. Is there any fix?William
12/07/2022, 8:05 AMVrushank Kenkre
12/07/2022, 10:53 AMTraceback (most recent call last):
File "/home/hadoop/.local/lib/python3.7/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'with_pyspark_emr'
`dagster._core.errors.DagsterImportError: Encountered ImportError: No module named 'with_pyspark_emr'
while importing module with_pyspark_emr. Local modules were resolved using the working directory /home/ec2-user/dagster/my-dagster-project
. If another working directory should be used, please explicitly specify the appropriate path using the -d
or --working-directory
for CLI based targets or the working_directory
configuration option for workspace targets.`
I have setup dagtser on a dev EC2 machine and trying to run the job on EMR. The module with_pyspark_emr in present in /home/ec2-user/dagster/my-dagster-project
, I am not able to figure out what the issue is. Can someone please help?Daniel Galea
12/07/2022, 12:23 PM@schedule(
job=custom_job
)
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
tags={"date": scheduled_date}
)
This requires you to have a pre-defined job by using the @job
annotation which I can't use because I need to pass a custom configuration to my job which cannot be done in the annotation. I am defining my jobs through graphs and my schedule via a job in the following way:
from <http://graphs.my|graphs.my>_graph import my_pipeline
def my_job(configs: dict):
logger = get_dagster_logger()
custom_config = {
"ops": {
"launch_emr_cluster": {"config": configs["emr_configs"]},
}
}
job = <http://emr_pipeline.to|emr_pipeline.to>_job(
name=configs["job_name"],
description=configs["job_description"],
partitions_def=DailyPartitionsDefinition(
start_date="2022-11-09", timezone="Europe/Amsterdam"
),
tags={
"dagster-k8s/config": {
"job_spec_config": {"ttl_seconds_after_finished": 300}
}
},
config=custom_config,
resource_defs={
"emr_job_runner": emr_job_runner
},
)
job_schedule = build_schedule_from_partitioned_job(
job,
default_status=DefaultScheduleStatus.RUNNING,
)
return job, job_schedule
The config
parameter is loaded from a JSON file and looks like the snippet below. The reason I do this is that I have a list of dictionaries which I use to dynamically create jobs.
{
"emr_release_label": "emr-6.8.0",
"cluster_name": "Dagster Cluster",
"master_node_instance_type": "m5.xlarge",
"worker_node_instance_type": "m5.xlarge",
"worker_node_instance_count": 1,
"ec2_subnet_id": "",
"worker_node_spot_bid_price": "",
"job_name": "JOB",
"job_description": "JOB DESCRIPTION"
}
I cannot pass this config parameter to a job which is declared in the following way:
@job(
name="daily_etl",
description="Daily ETL job",
partitions_def=DailyPartitionsDefinition(
start_date="2022-11-09", timezone="Europe/Amsterdam"
),
tags={
"dagster-k8s/config": {
"job_spec_config": {"ttl_seconds_after_finished": 300}
}
},
config=custom_config, # I DON'T WANT TO HARD CODE THIS CONFIG
resource_defs={
"emr_job_runner": emr_job_runner
},
)
def my_job():
step2(step1())
AJ Floersch
12/07/2022, 1:27 PMArthur
12/07/2022, 4:16 PM@asset(ins={"app_accounts": AssetIn(input_manager_key="stagecoach_io_manager")})
def ebdb_accounts(context,app_accounts):
return app_accounts
to no avail it gives this error.
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["app_accounts"]' for asset '["ebdb_accounts"]' is not produced by any of the provided asset ops and is not one of the provided sources
(edited)