Ismael Rodrigues
10/15/2022, 12:39 AMYassine Amzil
10/15/2022, 9:40 AMBen Andersen-Waine
10/16/2022, 10:11 AMgeoHeil
10/17/2022, 7:18 AMJosh Clark
10/17/2022, 7:45 AMNitin Madhavan
10/17/2022, 9:21 AMFilip Radovic
10/17/2022, 11:46 AMLucas Gabriel
10/17/2022, 12:02 PMZachary Bluhm
10/17/2022, 1:35 PMAdam Bloom
10/17/2022, 3:20 PMRepositoryData
class instance for this? What confuses me is that I don't see where the asset definitions are actually populated? https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/repository_definition.py#L468. There's also the dictionary style of repository definitions...but those don't look like they support assets either? https://github.com/dagster-io/dagster/blob/1f68be035588dddb4a02c82ad008d1c41dc8241[…]ules/dagster/dagster/_core/definitions/repository_definition.py. The CachingRepositoryData
class isn't exported, but appears to supports assets. https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/repository_definition.py#L482
What's the recommended way to proceed here? Switching to assets from ops brought the startup time for dagster-daemon/dagit up to almost 1 minute from nearly instantaneous, and with the k8s job launcher, that will add almost a minute to every job run.Shondace
10/17/2022, 6:40 PM@op
be included in the config_schema
of another @op
. Backstory, I'm trying to write a sensor that runs a job when the timestamp is greater than the some value. However, the timestamp is the output of a. @op
function. So, I'm a bit confused because the RunRequest has run_config
but there wasn't a config_schema
used. Any thoughts?Adam Bloom
10/17/2022, 7:59 PMSensor function returned an empty result
rather than the message passed in the skip reason. Am I missing something? This is what is shown in the examples, but just doesn't seem to work in reality. Note that I've mostly been using multi asset sensors, so perhaps this works with regular asset sensors but not multi asset?Fraser Marlow
10/17/2022, 9:28 PMI was having a hard time to open/connect to our dagster localhost:3000. I think we are using to old version of dagster since we started using it since 2020.
On connecting to dagster localhost, if there are pipelines that are unsuccessful/failed, we tend to run it manually on dagster UI via localhost:3000.
Steps on connecting to dagster: (1) open SDK shell terminal; (2) authenticate, (3) paste the ssh link from GCP VM instances
Before, I was able to connect and open dagster localhost:3000, but now, there’s seem a problem and I can’t find a way to solve/debug it.
I noticed in the terminal some warnings (image 2) and don’t even know if that’s related to my problem.
Is/are there a way for me to debug or connect to the dagster UI/localhost:3000? (image 1)
I’ve recently heard of the dasgter cloud, if we had a existing repo, can we connect it to the dagster cloud even if the dagster version was more early than the recent ones? Also, is it possible to get a demo of dagster and dagster cloud somehow?
Chris Comeau
10/17/2022, 11:10 PMCJ
10/18/2022, 1:00 AMSanidhya Singh
10/18/2022, 6:26 AMdagster.yaml
from $DAGSTER_HOME
via GraphQL?
Looking for something similar to how we can reload a Workspace with a GraphQL request.Ben Andersen-Waine
10/18/2022, 11:39 AMdagster_databricks
and the databricks_pyspark_step_launcher
?
I can see one for EMR: https://github.com/dagster-io/dagster/blob/1.0.13/examples/with_pyspark_emr/with_pyspark_emr/repository.py
It would be nice to see an example of the config required to pass to:
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured( {
# ???
}
),
Riccardo Tesselli
10/18/2022, 12:01 PM2022-10-18 11:00:04 UTC-634e8734.2a168-LOG: connection authorized: user=dagster_proddatabase=dagster_prod SSL enabled (protocol=TLSv1.2, cipher=ECDHE-RSA-AES256-GCM-SHA384, compression=off)
2022-10-18 11:00:05 UTC-634e8734.2a168-LOG: could not receive data from client: An existing connection was forcibly closed by the remote host.
any suggestions?Aman Saleem
10/18/2022, 12:35 PMThanakorn Kitsawat
10/18/2022, 3:26 PMMatthew Whiteside
10/18/2022, 4:05 PMstorage:
postgres:
postgres_db:
username: { DAGSTER_PG_USERNAME }
password: { DAGSTER_PG_PASSWORD }
hostname: { DAGSTER_PG_HOSTNAME }
db_name: { DAGSTER_PG_DB }
port: 5432
But in the dagster-postgres library documentation: https://docs.dagster.io/_apidocs/libraries/dagster-postgres it lists each as separate:
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { db_name }
port: { port }
What's the difference? Is storage
config equivalent to adding the three: event_log_storage
, run_storage
and schedule_storage
?Dusty Shapiro
10/18/2022, 4:50 PMdagster-user-deployments.deployments[0].env
be sufficient? Specifically, in this Slack example, it’s referencing a SLACK_TOKEN from the environment, but I just want to be 100% sure where these variables should be loaded.Bennett Norman
10/18/2022, 6:18 PMdagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
and this warning:
No previously stored outputs found for source StepOutputHandle(step_key='asset1', output_name='result', mapping_key=None). This is either because you are using an IO Manager that does not depend on run ID, or because all the previous runs have skipped the output in conditional execution.
I only get this error when I rerun assets. Is it possible to create an IO Manager that doesn’t overwrite assets on each run? I’m using context.get_identifier()
to get the run_id in the IO Manager.Ruoyu Qian
10/18/2022, 7:01 PMmy_sensor
#my sensor factory
def create_vero_event_sensor(
request_jobs: Optional[List[JobDefinition]],
run_status: DagsterRunStatus,
monitored_jobs: Optional[List[JobDefinition]],
sensor_name: str,
) -> RunStatusSensorDefinition:
@run_status_sensor(
monitored_jobs=monitored_jobs,
run_status=run_status,
request_jobs=request_jobs,
)
def my_sensor (context: RunStatusSensorContext):
message = f'Job "{context.dagster_run.job_name}" succeeded.'
print(message)
return my_sensor
# loops that factory loop through
test_sensors: List[Tuple[UnresolvedAssetJobDefinition, JobDefinition, str]] = [
(
my_dbt_test1,
job1,
"sensor_name1",
),
(
my_dbt_test2,
job2,
"sensor_name2",
),
]
dbt_sensors: List[RunStatusSensorDefinition] = []
for my_test, jobs, sensor_name in test_sensors:
sensor = create_vero_event_sensor(
monitored_jobs=[my_test],
run_status=DagsterRunStatus.SUCCESS,
request_jobs=[jobs],
)
dbt_sensors.append(sensor)
Ryan Navaroli
10/18/2022, 7:14 PMasyncio
for everything, but dagster seems to lack support for asyncio
with `@sensor`s. So basically calling any nats function (like creating a connection) requires an await
keyword, which can only be used within an async def
function. But dagster sensor needs to be set up as a synchronous def
function, which makes it very difficult to integrate with the nats sdk.
Any suggestions on how have an async @sensor
or how to work around?Stanley Yang
10/18/2022, 7:43 PMCraig Austin
10/18/2022, 9:17 PMop_tags
on an @asset
to configure pod_spec_config
for the K8sRunLauncher
doesn't seem to be working. Is this something that should be possible?James Hale
10/18/2022, 11:13 PMdagster-cloud/alert_emails
tag, but no alert emails are being sent.
Docs: https://docs.dagster.io/dagster-cloud/account/setting-up-alerts#using-system-tags-to-configure-alert-emails
Any suggestions?Stefan Adelbert
10/18/2022, 11:59 PMio_manager
as fs_io_manager.configured({"base_dir": "storage"})
and the op outputs are being pickled as expected.
I'd like to be able to run only a subset of the job's ops, but using the outputs from a previous job as the inputs, similar to what I can do in dagit
where I can re-execute selected ops. A justification would be not having to run an expensive API call again and again.
❔ When calling execute_in_process
, I know I can use op_selection
to specify the ops I want to run, but how do I reference the job whose op outputs should be used?
https://docs.dagster.io/_apidocs/execution#executing-jobsReid Beels
10/19/2022, 1:34 AMAssetDefinition.from_graph
no longer have independent keys and I don’t see an obvious way to set them.
e.g. this results in multiple named {env}_db
assets due to the use of configured
refresh_assets = [
AssetsDefinition.from_graph(
refresh_db.configured(
{"environment_name": env}, name=f"{env}_db"
),
resource_defs={
"db_engine": resources.dev_db_engine,
},
group_name="dev_env_databases",
)
for env in ENVS
]
this results in duplicate asset keys because there’s no way to set a name
or key
in from_graph
refresh_assets = [
AssetsDefinition.from_graph(
refresh_db,
resource_defs={
"db_engine": resources.dev_db_engine.configured({"environment_name": env}),
},
group_name="dev_env_databases",
)
for env in ENVS
]
using with_resources
instead of the experimental resource_defs
also doesn’t seem to provide a way modify the asset name/key
anything I’m missing?