Yang
11/02/2022, 11:14 PMslack_on_run_failure
sensor. Was there some more setup I need to do? It works on staging and prod (k8s agent)
AttributeError: 'NoneType' object has no attribute 'external_repository_origin'
File "/usr/local/lib/python3.9/site-packages/dagster/_core/errors.py", line 184, in user_code_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 320, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 1268, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 1421, in _wrapped_fn
for item in result:
File "/usr/local/lib/python3.9/site-packages/dagster/_core/definitions/run_status_sensor_definition.py", line 485, in _wrapped_fn
location_name=pipeline_run.external_pipeline_origin.external_repository_origin.repository_location_origin.location_name,
Qwame
11/03/2022, 12:19 AMmanifest.json
, an asset, to load_assets_from_dbt_manifest
? I want to do something like this but it is not working. Am I missing something?
@asset
def dbt_assets(context):
.........
return manifest.json
dbt_all_assets = with_resources(
load_assets_from_dbt_manifest(dbt_assets),
{'dbt': dbt_cli_resource}
)
Ismael Rodrigues
11/03/2022, 2:11 AMSimon Robertsson
11/03/2022, 9:25 AMrun_stuff_placeholder = define_asset_job("run_stuff_placeholder", selection="snowflake/mart_xy/f_1")
@schedule(job=run_stuff_placeholder, cron_schedule="*/5 * * * *")
def configurable_job_schedule():
return RunRequest(
asset_selection=[AssetKey("snowflake/mart_xy/f_2")]
)
Joshua Smart-Olufemi
11/03/2022, 11:47 AMdagit -f cereal.py
command for the cereal.py file I created and saved but got this error Using temporary directory C:\Users\josh\Desktop\dagster\tmphvlnxh7o for storage. This will be removed when dagit exits.
To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
0it [00:00, ?it/s]
0it [00:00, ?it/s]
C:\Users\josh\.virtualenvs\venv\lib\site-packages\dagster\_core\execution\compute_logs.py:42: UserWarning: WARNING: Compute log capture is disabled for the current environment. Set the environment variable PYTHONLEGACYWINDOWSSTDIO to enable.
warnings.warn(WIN_PY36_COMPUTE_LOG_DISABLED_MSG)
2022-11-03 12:31:57 +0100 - dagit - INFO - Serving dagit on <http://127.0.0.1:3000> in process 18796
ERROR: [Errno 10048] error while attempting to bind on address ('127.0.0.1', 3000): only one usage of each socket address (protocol/network address/port) is normally permitted
Clément Masson
11/03/2022, 12:59 PMMark Fickett
11/03/2022, 1:10 PMZachary Bluhm
11/03/2022, 2:07 PMinput
asset keys from an Ops ExecutionContext?
I see a method to get the outputs, but not he inputs
I think I can loop through the "ins" and pass them to: context.asset_key_for_input
, going to give this a whirlRohan Prasad
11/03/2022, 2:37 PMJose Estudillo
11/03/2022, 2:42 PMShutting down Dagstere code server for package <x> in process <y>
any hints on why this could be happening?Nicolas Parot Alvarez
11/03/2022, 3:06 PMZachary Bluhm
11/03/2022, 3:26 PMshailesh
11/03/2022, 4:21 PMSelene Hines
11/03/2022, 5:05 PMRahul Dave
11/03/2022, 5:26 PMRahul Dave
11/03/2022, 5:28 PMRahul Dave
11/03/2022, 5:38 PMRahul Dave
11/03/2022, 5:43 PMRahul Dave
11/03/2022, 5:43 PMRahul Dave
11/03/2022, 5:45 PMRahul Dave
11/03/2022, 5:47 PMCarlton Duffett
11/03/2022, 6:05 PM['apple', 'banana', 'pear']
• I would like to materialize all data types at the same time, using a @multi_asset
- for example to create a fruit_tables
asset
• I know how to create a single asset from a graph using AssetsDefinition.from_graph()
, but how do I make that a multi-asset?Tom Reilly
11/03/2022, 7:50 PMArbitrary-length homogeneous tuples can be expressed using one type and ellipsis, for examplewhen I try to use this on an op I get the error. (TheTuple[int, ...]
here are part of the syntax, a literal ellipsis.)...
E dagster._core.errors.DagsterInvalidDefinitionError: Invalid type: dagster_type must be an instance of DagsterType or a Python type: got Ellipsis.
using dagster 1.0.10Prag
11/03/2022, 10:02 PMnickvazz
11/03/2022, 10:05 PMconfig.json
points.csv
and sweep.py
where sweep.py
loads in config.json & points.csv
which is currently a hard constraint as the simulation CLI isnt under our control and the only parameterization possible is required to be contained within the sweep.py
. Additionally this python is actually jython
and not cpython
so we treat the running of this simulation.exe --run sweep.py
as a subprocess cli call. Currently we embedded some of the tracking hashes around the naming of the files (config_{CONFIG_HASH}.json
points_{POINTS_HASH}.csv
sweep_{CONFIG_HASH}_{POINTS_HASH}.py
)
What we currently do:
we run a cli that consumes a directory with an unknown amount of these sweep.py
files and submits them to dask
to run and chain the following pipeline-able operations using dask's as_completed functionality. We use some jinja templating to fill in the sweep.py
as we need and separate the runs into batches of points.csv & config.json
within a directory (sometimes ~10k+ sweep.py
within directory) that we can call an experiment
for lack of a better name. When these run, they create some additional files meta_{CONFIG_HASH}_{POINTS_HASH}.json
& output_{CONFIG_HASH}_{POINTS_HASH}.csv
and those get combined down stream into a final_{CONFIG_HASH}_{POINTS_HASH}.csv
(which cant happen within the jython environment due to pandas requirements so is easiest with dumping intermediate files) with the entire directory combined into experiment.parquet
. This experiment.parquet
will eventually have other transformations + reports generated from it (likely larger `@graph`s containing this @graph
and/or @asset
) but currently focusing on the main path of execution.
What I am trying to do:
I want to migrate this to using dagster / local s3 (minio) / postgres / (potentially dbt but still require some investigation) and having trouble with the asset abstraction in relation to this. In a previous thread (before I could spend my time fully implementing) I was pointed at potentially utilizing partitions but those look to be more tailored to time windowing rather than runtime s3 buckets/prefixes (looking to implement but not required). It seems like I could at dagit start/import time read the s3 buckets to create the partitions with static_partitioned_config somehow. Additionally, we would like to be able to re-use a lot of this main run these sweep.pys
graph-ish thing in larger graphs / jobs.
Things I think I have the generally right idea:
• Since we would like to reuse the operations that consume a directory of `sweep.py`s, it should be a @graph
-- would want to share some config_schema
args along with it so require make_values_resource
graphs cant be called within graphs but can be turned into assets that can be used together I guess? my previous discussion about graph in graphs• to have full control of what is sent to dask / if using
DynamicOutput
, can't use the dagster-dask
executor (dagster-dask doesnt support `DynamicOutput` ) but instead submit / gather through using the dagster-dask
resource
Things that arnt quite clear:
• When can I and can I not pass context
to an op/asset/job/graph
?
• should a @job
only contain plain def some_job(): graph_1(); graph_2(); graph_3();
with no arguments? can/should a job/graph
have arguments?
• should every .csv
be its own asset
? Would that mean a partition for every single .csv
file (potentially in the thousands very quickly)?
So far the asset
idea seems like it should fit here somewhere but its not super clear where yet, but the asset graph visualization is awesome and would love to be able to take advantage of it! dagster spinZach
11/03/2022, 11:18 PMbuild_op_context
helper? it seems that you can't pass in a MagicMock instead because Dagster ensures that an OpExecutionContext is being passed to the opBennett Norman
11/03/2022, 11:37 PMStefan Adelbert
11/04/2022, 3:58 AMtuple(rrule(WEEKLY, interval=2, dtstart=datetime.date(2022,7,4), until=date))[-2:]
where 04/07/2022 is a known historic pay period start date.
I'm considering a sensor which runs once a day and checks if "today" is the Tuesday after the most recent period. If so, yield a RunRequest.
Any other ideas?saravan kumar
11/04/2022, 6:24 AM@job(resource_defs={"db_session": db_session})
def job_sample_db_test():
run_sample_db_test()
job_sample_db_test.execute_in_process(
run_config={
"resources": {
"db_session": {
"config": {
"DB_USER": {"env": "DB_USER"},
"DB_PASSWORD": {"env": "DB_PASSWORD"},
}
}
}
}
)
@resource
@contextmanager
def met_db_session(context):
try:
user = context.resource_config["DB_USER"]
password = context.resource_config["DB_PASSWORD"]
db = os.getenv("DB_DB", "")
host = context.resource_config["DB_HOST"]
port = int(os.getenv("DB_PORT", 54332))
db_connection = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
session_maker = sessionmaker(bind=db_connection)
session = session_maker()
yield session
finally:
db_connection.close()
Whatever is using os.getenv gets the proper values ,but context.resource_config just get the {"env": "DB_USER"} without actually reading the DB_USER from the environment..what i am missing? The secrets are kuberenets secrets and are available by os.getenv
I am just looking for a resource which gets me sql alchemy db connection ,so if there is a better way ,i am down for it...Levan
11/04/2022, 7:57 AMasset_name
and table_name
for asset definition with io_manager? I’ve got 2 tables with the same name in different db/schema and defining them in different dagster repos raises error of duplicated asset names.