Stefan Adelbert
11/04/2022, 3:58 AMtuple(rrule(WEEKLY, interval=2, dtstart=datetime.date(2022,7,4), until=date))[-2:]
where 04/07/2022 is a known historic pay period start date.
I'm considering a sensor which runs once a day and checks if "today" is the Tuesday after the most recent period. If so, yield a RunRequest.
Any other ideas?saravan kumar
11/04/2022, 6:24 AM@job(resource_defs={"db_session": db_session})
def job_sample_db_test():
run_sample_db_test()
job_sample_db_test.execute_in_process(
run_config={
"resources": {
"db_session": {
"config": {
"DB_USER": {"env": "DB_USER"},
"DB_PASSWORD": {"env": "DB_PASSWORD"},
}
}
}
}
)
@resource
@contextmanager
def met_db_session(context):
try:
user = context.resource_config["DB_USER"]
password = context.resource_config["DB_PASSWORD"]
db = os.getenv("DB_DB", "")
host = context.resource_config["DB_HOST"]
port = int(os.getenv("DB_PORT", 54332))
db_connection = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
session_maker = sessionmaker(bind=db_connection)
session = session_maker()
yield session
finally:
db_connection.close()
Whatever is using os.getenv gets the proper values ,but context.resource_config just get the {"env": "DB_USER"} without actually reading the DB_USER from the environment..what i am missing? The secrets are kuberenets secrets and are available by os.getenv
I am just looking for a resource which gets me sql alchemy db connection ,so if there is a better way ,i am down for it...Levan
11/04/2022, 7:57 AMasset_name
and table_name
for asset definition with io_manager? I’ve got 2 tables with the same name in different db/schema and defining them in different dagster repos raises error of duplicated asset names.Deepa Vasant
11/04/2022, 8:59 AMDavi
11/04/2022, 9:20 AMJoshua Smart-Olufemi
11/04/2022, 10:26 AMFileNotFoundError: [WinError 2] The system cannot find the file specified: 'serial_asset_graph.py'
File "C:\Users\josh\Desktop\toggle assignment\assignment\venv\lib\site-packages\dagster\_grpc\server.py", line 230, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
File "C:\Users\josh\Desktop\toggle assignment\assignment\venv\lib\site-packages\dagster\_grpc\server.py", line 104, in __init__
loadable_targets = get_loadable_targets(
File "C:\Users\josh\Desktop\toggle assignment\assignment\venv\lib\site-packages\dagster\_grpc\utils.py", line 33, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "C:\Users\josh\Desktop\toggle assignment\assignment\venv\lib\site-packages\dagster\_core\workspace\autodiscovery.py", line 27, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "C:\Users\josh\Desktop\toggle assignment\assignment\venv\lib\site-packages\dagster\_core\code_pointer.py", line 75, in load_python_file
os.stat(python_file)
I'm a bit spun because I have made no changes to my setup. I would love some help in debugging thisMatthew Karas
11/04/2022, 12:23 PMJordan
11/04/2022, 12:24 PM@sensor(
name='my_sensor',
job=my_job,
minimum_interval_seconds=30,
)
def my_sensor(context: SensorEvaluationContext):
try :
since_key = context.cursor or None
new_s3_keys = get_s3_keys(
bucket=bucket,
prefix=prefix_path,
since_key=since_key,
)
if not new_s3_keys:
return SkipReason(f"No new s3 files")
last_key = new_s3_keys[-1]
if …:
context.update_cursor(last_key)
return [RunRequest(tags=None, run_key=file) for file in new_s3_keys]
context.update_cursor(last_key)
return SkipReason("No file corresponds")
except Exception as err:
send_notification(err)
stop_this_sensor()
How do I use DagsterInstance.stop_sensor
at this point in the code and get instigator_origin_id
and selector_id
? Thanks in advanceBojan
11/04/2022, 2:11 PM@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
Bojan
11/04/2022, 2:11 PMManish Khatri
11/04/2022, 3:06 PMload_assets_from_dbt_project(…)
. All the models + Lineage shows up in Dagit which is superb 😛artydagster:.
If I wanted to define some of the parent assets (like the one arrowed in the attached picture), how can I do this as a SourceAsset
with a TableSchema.from_name_type_dict(…)
so we can document the columns and have this linked to the stg_snowflake_query_history
DBT asset in the picture? Is this possible?Sireesha Kuchimanchi
11/04/2022, 3:42 PMSireesha Kuchimanchi
11/04/2022, 3:49 PMstefan hansan
11/04/2022, 4:09 PMsession = context.resources.request_session
<http://session.post|session.post>(url, data= {'foo': 'bar'})
Would anyone know where this data object resides in the POST request itself? I am trying to unwrap it on the backend of the URL I am calling, but i just cant figure out where it lives/ Thank you!Salvador Ribolzi
11/04/2022, 4:28 PMos.getenv('var')
returns nothing even though the var is set for all users, any idea what might be happening? This also causes s3_resource
to fail with no credentials (ran both aws configure
and added the key/secret as env vars)
We are a bit limited permissions wise so accessing the windows user that's running Dagster is not possible for us (though IT is helping with that)Aaron Hoffer
11/04/2022, 7:10 PMv1.0.16
and getting sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
errors, however I’m running it in k8s using a postgres backend for jobs/schedules/etc. My jobs are using in memory output storage and I did notice this recent change https://github.com/dagster-io/dagster/pull/10154 and the exceptions are happening in the pods generated for the kubernetes jobs. The jobs seem to complete fine but I’m getting at least one exception per run.Vineet Balachandran
11/04/2022, 7:31 PMNicolas May
11/04/2022, 7:52 PMdagster.yaml
? Like one for local dev/test/debug and one for prod deployment?
I see there can be different workspace.yaml
-like files thanks to --workspace
CLI option (https://docs.dagster.io/_apidocs/cli#cmdoption-dagster-daemon-run-w).Jack Yin
11/04/2022, 8:22 PMrun_request_for_partition
to kick off the downstream partitioned jobs, but i’m not sure how to get the partition of the asset that the sensor is looking atJack Yin
11/04/2022, 8:22 PMMultiAssetSensor
?Selene Hines
11/04/2022, 8:23 PMkyle
11/04/2022, 10:50 PMassets_pandas_pyspark
the source asset exists as a local file. In my case I just want to define this as a file on s3.
sfo_q2_weather_sample = SourceAsset(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
)
Maksym Domariev
11/05/2022, 4:10 AMdagster._core.errors.DagsterImportError: Encountered ImportError:the root of the project is correct, any tips?while importing module dask_sample_project. Local modules were resolved using the working directoryNo module named 'dask_sample_project'
. If another working directory should be used, please explicitly specify the appropriate path using the/Users/<my username>/workspace/flow/nlp/dask_sample_project
or-d
for CLI based targets or the--working-directory
configuration option for workspace targets.working_directory
fahad anwaar
11/05/2022, 6:20 AMsqlalchemy
I have checked dagster-sqlalchemy
library
But i’m looking for more custom approach to use sqlachemy
Please share any article
ThanksQwame
11/05/2022, 9:23 AMIOManagers
and I have a question. How do I access OutputContext
metadata values added in load_input
. For e.g.
def handle_output(self, context: OutputContext, obj: Dict) -> None:
...
context.add_output_metadata({"value_to_be_accessed": 'I want to access this value'})
def load_input(self, context: InputContext) -> str:
<http://context.log.info|context.log.info>(context.upstream_output.get_logged_metadata_entries())
I have tried context.upstream_output.metadata
, context.upstream_output.get_logged_metadata_entries()
and have had no success. How do I do this?Bojan
11/05/2022, 1:13 PMCreate an asset to represent the table and pass it downstream. The IO manager for your table asset will then be responsible for how the table is presented to the downstream asset.
I’m using snowflake’s io manager from https://docs.dagster.io/_apidocs/libraries/dagster-snowflake.
What i’m having issues with is essentially “materializing” from an existing table in snowflake.
ie. i have a table my_table in a schema my_schema.
It’s not clear to me how to create to load in this table as an asset and then use it downstream in other assets
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
return ???
...
Is this even possible ?Sireesha Kuchimanchi
11/05/2022, 2:37 PMSireesha Kuchimanchi
11/05/2022, 2:37 PMSireesha Kuchimanchi
11/05/2022, 2:38 PMSireesha Kuchimanchi
11/05/2022, 2:39 PMSireesha Kuchimanchi
11/05/2022, 2:39 PMKyle Gobel
11/05/2022, 4:58 PMpip install dagster-snowflake
(i think) and then maybe you'll get some better error messagesSireesha Kuchimanchi
11/06/2022, 1:44 PM