Thomas Mignon
12/17/2021, 8:56 AMThomas Mignon
12/17/2021, 8:59 AMfrom dagster import repository
from jobs import compact_monthly, process_daily
@repository
def process_daily_compact_monthly_repository():
return [compact_monthly, process_daily]
and compact montlhy look like this :
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
compacted_event = compact_event_session(compact_partition(compact_purify()))
compact_export_oceanography(compact_business_oceanography(compacted_event))
compact_export_sextant(compact_business_sextant(compacted_event))
compact_export_seanoe(compact_business_seanoe(compacted_event))
compact_export_archimer(compact_business_archimer(compacted_event))
and process_daily look like this :
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
event = event_session(partition(purify()))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
They are 2 jobs so why this error happend ?
( I'm using the workspace.yaml mecanism in order to specify where are my repo, in order to launch dagit without parameters )Jens
12/17/2021, 6:00 PMCharles Leung
12/17/2021, 7:37 PMCharles Leung
12/17/2021, 10:52 PMTraceback (most recent call last):
File "/usr/local/bin/dagit", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 239, in main
cli(auto_envvar_prefix="DAGIT") # pylint:disable=E1120
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1128, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1053, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1395, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 754, in invoke
return __callback(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 119, in ui
**kwargs,
File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 136, in host_dagit_ui
with get_instance_for_service("dagit") as instance:
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/dagster/cli/utils.py", line 12, in get_instance_for_service
with DagsterInstance.get() as instance:
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 387, in get
return DagsterInstance.from_config(dagster_home_path)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 402, in from_config
return DagsterInstance.from_ref(instance_ref)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 423, in from_ref
run_launcher=instance_ref.run_launcher,
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/ref.py", line 264, in run_launcher
return self.run_launcher_data.rehydrate() if self.run_launcher_data else None
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
return klass.from_config_value(self, result.value)
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/launcher.py", line 209, in from_config_value
return cls(inst_data=inst_data, **config_value)
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/launcher.py", line 130, in __init__
kubernetes.config.load_incluster_config()
File "/usr/local/lib/python3.7/site-packages/kubernetes/config/incluster_config.py", line 121, in load_incluster_config
try_refresh_token=try_refresh_token).load_and_set(client_configuration)
File "/usr/local/lib/python3.7/site-packages/kubernetes/config/incluster_config.py", line 54, in load_and_set
self._load_config()
File "/usr/local/lib/python3.7/site-packages/kubernetes/config/incluster_config.py", line 73, in _load_config
raise ConfigException("Service token file does not exist.")
Am I missing a value in values.yaml somewhere?Jens
12/18/2021, 1:34 PMmedihack
12/18/2021, 5:30 PMRootInputManager
. For me it would make sense to use it the same way I use a IOManager
. That way I can use the same Manager that inherits from both classes, something like this:
class DatabaseManager(RootInputManager, IOManager):
def handle_output(self, context, obj):
...
def load_input(self, context):
...
@io_manager(required_resource_keys={"database_client"})
def database_io_manager():
return DatabaseManager()
@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager(_):
return DatabaseManager()
But unfortunately, it doesn't seem to work that way. The load_input
is never called when using DatabaseManager
as a root input. I just get a type check error. I guess because database_root_manager
should not return the RootInputManager
, but the input data itself. This makes it so different from each other when both cases could be so nicely combined. Or do I miss something here?
EDIT:
I found the following workaround, but still think that it is quite ugly.
@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager(context: InputContext):
manager = DatabaseManager()
return manager.load_input(context)
In my opinion @root_input_manager
tries to do to much magic here (by creating it's own RootInputManager
). I would prefer that it works exactly like @io_manager
. What do you think?Todd Hendricks
12/19/2021, 11:10 PMDimka Filippov
12/20/2021, 11:35 AMMathew Lee
12/20/2021, 3:12 PMJeremy Fisher
12/20/2021, 4:57 PMpytest-postgresql
to spin up a test database for each of my jobs. I can access it as a fixture in a pytest function, where I convert it to a resource like so:
def test_foo(postgresql):
engine = create_engine(
f"postgresql+psycopg2://{postgresql.info.user}:@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}",
echo=False,
poolclass=NullPool,
)
@resource
def test_data_warehouse(context):
return Postgres(engine=engine)
my_job = my_graph.to_job(resource_defs={"data_warehouse": test_data_warehouse})
This works until I added a Dagstermill solid: `dagster.core.errors.DagsterInvariantViolationError: Reconstructable target was not a function returning a job definition, or a job definition produced by a decorated function. If your job was constructed using ``GraphDefinition.to_job``, you must wrap the ``to_job`` call in a function at module scope, ie not within any other functions. To learn more, check out the docs on ``reconstructable``: https://docs.dagster.io/_apidocs/execution#dagster.reconstructable`
But I cannot define the job at the module level, because the postgres engine is only available to me as a fixture.
Any workarounds?Jens
12/20/2021, 8:04 PMChris Chan
12/20/2021, 9:04 PMStefan Adelbert
12/21/2021, 4:44 AMdagster.yaml
I have previously had something like the following in my dagster.yaml
which allowed me to use environment variables to control some details of how a run worker will log,
python_logs:
dagster_handler_config:
handlers:
gcpHandler:
(): my_logging.Handler
level: DEBUG
formatter: gcpFormatter
formatters:
gcpFormatter:
(): my_logging.Formatter
client:
env: DAGSTER_CLIENT
environment:
env: DAGSTER_ENVIRONMENT
It worked really nicely.
I switched back to using this technique today, but the variable substitution doesn't seem to be working (0.13.12) - the client
and environment
parameters to my_logging.Formatter.__init__
are dicts like {"env": "DAGSTER_CLIENT"}
, as if the variable substitution has been skipped.
I am using variable substitution in other parts of the dagster.yaml
and that seems to be working fine, e.g.
compute_logs:
module: dagster.core.storage.local_compute_log_manager
class: LocalComputeLogManager
config:
base_dir:
env: DAGSTER_LOCAL_COMPUTE_LOG_MANAGER_DIRECTORY
I'm hoping that I'm simply doing something stupid, but I can't figure it out...Martin Carlsson
12/21/2021, 9:46 AMjob
(see attached image) where I get secrets from Azure Key Vault. The way I’ve implemented it is that each secret gets its own op
. It that the best way of doing it?
This is the way i’ve implemented Azure Key Vault … however, I’m not sure that this is the best way
import os
from azure.identity import ClientSecretCredential
from azure.keyvault.secrets import SecretClient
from dagster import EventMetadata, Failure, op
class Secrets:
def __init__(self) -> None:
self.__vault_url = os.environ.get("AZURE_KEY_VAULT_URI")
self.__credential = ClientSecretCredential(
tenant_id=os.environ.get("AZURE_TENANT_ID"),
client_id=os.environ.get("AZURE_KEY_VAULT_SERVICE_PRINCIPAL_CLIENT_ID"),
client_secret=os.environ.get(
"AZURE_KEY_VAULT_SERVICE_PRINCIPAL_CLIENT_SECRET"
),
)
self.__client = SecretClient(
vault_url=self.__vault_url, credential=self.__credential
)
def get_secret(self, secret_name: str) -> str:
key_vault_secret_name = {
"azure_storage_connecting_string": "StorageAccountConnectionString",
"eon_client_id": "EonClientId",
"eon_client_secret": "EonClientSecret",
"for_testing_key_vault": "ForTestingKeyVault",
}[secret_name]
try:
return self.__client.get_secret(key_vault_secret_name).value
except Exception as e:
raise Failure(
description="Error getting data from azure key vault",
metadata={"Error message:": EventMetadata.text(str(e))},
)
@op()
def get_secret_azure_storage_connecting_string() -> str:
"""Retrieve Azure Storage Connection String from Azure Key Vault"""
return Secrets().get_secret("azure_storage_connecting_string")
@op()
def get_secret_eon_client_id() -> str:
"""Retrieve Eon client id from Azure Key Vault"""
return Secrets().get_secret("eon_client_id")
@op()
def get_secret_eon_client_secret() -> str:
"""Retrieve Eon client secret from Azure Key Vault"""
return Secrets().get_secret("eon_client_secret")
@op()
def get_secret_for_testing_key_vault() -> str:
"""Retrieve testing key from Azure Key Vault"""
return Secrets().get_secret("for_testing_key_vault")
Chris Evans
12/21/2021, 4:13 PMFound 100 models...; Concurrency: ... 1 of 3 START....
don't seem to be coming back in any request I make despite logging in the ops toggled on. Is this the correct behaviour or am I missing something?Jorge Sánchez (Jorjatorz)
12/21/2021, 4:42 PMmodel_id
UUID) and added this information to the AssetMaterialization metadata. Then the asset_sensor read this metadata and yield a run for the target pipeline with a configuration filled with this metadata. In other words, there was a “root” pipeline who received the initial configuration and this config was cascaded down to other pipelines through assets. If a pipeline had to add new config data, it added to the AssetMat. metadata and the next pipeline would make use of it. Usually the configs data configured a ResourceDefinition.string_resource()
so you can expect the the model_id
I commented before would become a String resource called model_id
, so it is easier to get it from all solids that required it.
I am currently migrating to the new framework, and these interconnected pipelines has now become graphs
The job provides the general configuration to the main resources but there are some resources (like the model_id
) that are created in a subgraph and subsequent graphs/solids will make use of it. The approach I am currently taking is that each graph will return a config_dict
and the next graph will get this config_dict
as input. So the graph that creates a model will insert the model_id
into it and subsequent graphs will have access to it, BUT this time they would get it through this config_dict
instead of a resource.
I would like to know your thoughts about this approach. Ideally I would like to be able to insert resources (for example the model_id
resource) or be able to modify the ResourceDefinition.string_resource()
from a solid, so the model_id
is empty and then populated with the id. Thank you 😄geoHeil
12/21/2021, 4:55 PMDynamicOutput
job in dagster I wonder how to restart it:
1. I can nicely run all the ops
2. but do not want to. Instead, I firstly want to run only return_some_list
3. and then (from the launchpad)
- re-trigger generate_subtasks
(i.e. use the first task as a sort of cache during the development of the code inside generate_subtasks (or any downstream operations)). A run_from_selected
afterwards does not add in the missing dynamic operations.
- when trying to run the next steps (generate_subtasts
) solo (after successfully running the first step). I am asked for configuration values (which I am NOT asked for when running the full pipeline):
Value for selector type at path root:ops:generate_subtasks:inputs:nums must be a dict
Why am I asked? And what do I need to fill into it to get it to run?
from dagster import graph
from dagster import op, get_dagster_logger
from dagster.config import Field
from dagster import DagsterType, Out, In
from dagster import DynamicOut, DynamicOutput, Field, op
from typing import List
import os
@op
def return_some_list(context):
return [1, 2, 3, 4, 5]
@op(out=DynamicOut(int))
def generate_subtasks(nums):
get_dagster_logger().info(f"Found {nums} numbers")
for num in nums:
yield DynamicOutput(num, mapping_key=f'subtask_{num}')
@op(ins={ 'input_number': In(int)}, out=Out(str))
def hello_dynamic(input_number:int):
get_dagster_logger().info(f"Found {input_number} cereals")
return f"Hello, {input_number} Number!"
@graph
def dynamic_dummy():
items = return_some_list()
tasks = generate_subtasks(items)
tasks.map(hello_dynamic)
say_hello_job = dynamic_dummy.to_job()
Bernardo Cortez
12/21/2021, 5:02 PM'optional_input': In(DataFrame, default_value=None)
. This works when I pass a DataFrame to the op. However, if I pass no input, it raise this error:
DagsterTypeCheckDidNotPass: Type check failed for step input "optional_input" - expected type "PySparkDataFrame". Description: Value of type <class 'NoneType'> failed type check for Dagster type PySparkDataFrame, expected value to be of Python type DataFrame.Besides, if I implement it as
'optional_input': In(Any, default_value=None)
, it would raise an error when passing a DataFrame to the op.
Can someone help me? Thanks!Marc Keeling
12/21/2021, 6:33 PMarvi
12/22/2021, 7:56 AMThomas Mignon
12/22/2021, 10:11 AMfrom dagster import fs_io_manager, job, Any
from dagster_dask import dask_executor
from pathlib import Path
from semaphore_scripts import template_runner
import os
from dagster import Nothing, In, op
@op
def adder_purified(purified_ftp: Any, purified_ftp2: Any, purified_eftp1: Any, purified_eftp2: Any, purified_httpvpublic: Any, purified_httpvpublicnew: Any) -> Any:
return purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew
def template_runner_op(
name="default_name",
ins=None,
**kwargs,
):
"""
Args:
args (any): One or more arguments used to generate the nwe op
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""
@op(
name=name,
ins=ins or {"start": In(Nothing)},
config_schema={
"command": str,
"command_parameter": str,
"semaphore_audience_version": str,
"workspace": str
},
required_resource_keys={"io_manager"},
**kwargs
)
def _template_runner_op(context):
workspace: Path = Path(
os.path.join(context.op_config["workspace"], f'{context.op_config["command"]}-{os.environ["PBS_JOBID"]}'))
template_runner.TemplateRunner(
command=context.op_config["command"],
version=context.op_config["semaphore_audience_version"],
command_parameter=context.op_config["command_parameter"],
workspace=workspace,
skip_signal=True
).run(exit=False)
return _template_runner_op
purify_ftp = template_runner_op(name="purify_ftp")
purify_ftp2 = template_runner_op(name="purify_ftp2")
purify_httpvpublic = template_runner_op(name="purify_httpvpublic")
purify_httpvpublicnew = template_runner_op(name="purify_httpvpublicnew")
purify_eftp1 = template_runner_op(name="purify_eftp1")
purify_eftp2 = template_runner_op(name="purify_eftp2")
partition = template_runner_op(name="partition")
event_session = template_runner_op(name="event_session")
#Business/Export
business_oceanography = template_runner_op(name="business_oceanography")
export_oceanography = template_runner_op(name="export_oceanography")
business_sextant = template_runner_op(name="business_sextant")
export_sextant = template_runner_op(name="export_sextant")
business_seanoe = template_runner_op(name="business_seanoe")
export_seanoe = template_runner_op(name="export_seanoe")
business_archimer = template_runner_op(name="business_archimer")
export_archimer = template_runner_op(name="export_archimer")
export_smoswind = template_runner_op(name="export_smoswind")
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
purified_ftp = purify_ftp()
purified_ftp2 = purify_ftp2()
purified_eftp1 = purify_eftp1()
purified_eftp2 = purify_eftp2()
purified_httpvpublic = purify_httpvpublic()
purified_httpvpublicnew = purify_httpvpublicnew()
purified = adder_purified(purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew)
event = event_session(partition(purified))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
This give me this graph ( first picture ), which seems like what i want even if we see the adder_purify op in the graph but this give me this error,
2021-12-22 10:02:09 - dagster - ERROR - template_runner_job - 911a185d-cb6a-4fef-ad6a-2ef75a28ac49 - 22690 - RUN_FAILURE - Execution of run for "template_runner_job" failed. An exception was thrown during execution.
dagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
The only way i found by now to run my graph is to modify template_runner_job
in order to have only sequentially not parallelized at the beginning :
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
event = event_session(partition(purify_ftp(purify_ftp2(purify_eftp1(purify_eftp2(purify_httpvpublic(purify_httpvpublicnew)))))))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
But this lead me to this graph ( second picture ) that is not what i wantThomas Mignon
12/22/2021, 10:12 AMChris Evans
12/22/2021, 4:12 PM<date/time-partition>_partitioned_config
decorators are missing the following respective offsets/params minute_of_hour=None, hour_of_day=None, day_of_week=None, day_of_month=None
that are present on the build_schedule_from_partitioned_job
helper function. Unless I'm mistaken, without these params we get into a weird scenario where if we need an offset we have to apply it against the injected date/time partition when building the run_config
fx. For example, for a weekly partitioned config the partitions seem to be generated based on this cron 0 0 * * 0
; if I actually care about weekly partitions on Mondays then I would have to timedelta the injected data/time partition to create 0 0 * * 1
. Furthermore, we will have partitions displayed in Dagit that aren't relevant e.g. partition 2021-05-02
(Sunday) when I actually care about 2021-05-03
.DK
12/22/2021, 8:12 PM@op
def dbt_sales_models():
return ['sales'] # what models to run in dbt?
@op(
ins={
"sales": In(root_manager_key="warehouse_loader",metadata={"table": "sales"},)
},
out=Out(
dagster_type=DataFrame,io_manager_key="warehouse_io_manager",metadata={"table": "sales_transformed"}
),
)
def transform_sales_data(sales: DataFrame):
sales_transformed = sales.groupby(['year','month'])['gross_adjusted'].sum()
return Output(
sales_transformed,
metadata={"# periods":sales_transformed.shape[0]}
)
@graph
def run_dbt_sales(): # this will be used in a job that runs on a schedule
dbt(dbt_sales_models())
@graph
def load_refresh_sales(): # this will be in a job that runs when needed
run_dbt_sales() # need to call this to make we have the latest data
# run this only after run_dbt_sales() has finished
transform_sales_data() # operate on the latest base sale data :: THIS WILL RUN BEFORE DBT IS FINISHED, how to block until run_dbt_sales is done?
# transform_sales_data(run_dbt_sales())
# ^ this requires the creation of input arugment in transform_sales_data,
# how to avoid this and just create a dependency structure?
Mykola Palamarchuk
12/22/2021, 8:15 PMgeoHeil
12/22/2021, 9:45 PMreturn users_df
directly outputs the dataframe - so it is clear to me how the data is shared from op to op. But https://docs.dagster.io/tutorial/advanced-tutorial/materializations uses yield Output(None)
. Assuming I have created an asset with key `assset_foo_1`in the directory output/foo
How can I (1) link a second job using only the asset(key) as an input (i.e. like select * form table where the metastore knows the path details and (2) have the 2nd op/job re-trigger in case the asset was updated?Qwame
12/22/2021, 10:20 PMdagster.core.executor.child_process_executor.ChildProcessCrashException
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 163, in execute
event_or_none = next(step_iter)
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 268, in execute_step_out_of_process
for ret in execute_child_process_command(command):
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\child_process_executor.py", line 157, in execute_child_process_command
raise ChildProcessCrashException(exit_code=process.exitcode)
I don't quite understand what happened. Any help?Rasheed Elsaleh
12/22/2021, 11:00 PMpaul.q
12/23/2021, 12:28 AMPartitionSetDefinition
, create_schedule_definition
and pipeline
. We used to have a PartitionSetDefinition
with a partition_fn
that created a list of Partition
for a date range, but only Mon-Fri. We were able to use this to create different PartitionSet
objects for different pipelines as well as create a schedule using create_schedule_definition
on a PartitionSet
object.
Now, I have a Job
for my Graph
and its config is coming from a daily_partitioned_config
- which doesn't exclude weekends as I would like. Should I be messing with dynamic_partition_config
to achieve this? I also use build_schedule_from_partitioned_job
to achieve having a schedule for the job. I was able to get the partitions looking the way I wanted using dynamic_partition_config
, but build_schedule_from_partitioned_job
returned this error, where get_effective_dates1
returns a set of date strings of the form `%Y-%m-%d`:
dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) with type <class 'dagster.core.definitions.partition.DynamicPartitionsDefinition'>.
Thanks
Paul