Stefan Adelbert
02/16/2023, 4:27 AM{
repositoriesOrError {
... on RepositoryConnection {
nodes {
jobs {
runs {
id
startTime
endTime
}
}
}
}
}
}
looks like
{
"data": {
"repositoriesOrError": {
"nodes": [
{
"jobs": [
{
"runs": [
{
"id": "25718726-4265-45fb-b92f-ef656ec0703a",
"startTime": 1675048818.020318,
"endTime": 1675048823.704493
}
]
}
]
}
]
}
}
}
The startTime
and endTime
have documentation like
startTime: Float
TYPE DETAILS
TheWhat can we assume about the timezone or locale of those times?scalar type represents signed double-precision fractional values as specified by IEEE 754.Float
Peter Davidson
02/16/2023, 7:03 AMJesper Bagge
02/16/2023, 7:51 AMRahul Dave
02/16/2023, 8:40 AM@input_manager
decorator needs to decorate the function that will instantiate the internal io-manager?Rahul Dave
02/16/2023, 8:41 AMload_input
in the io_managerFlavien
02/16/2023, 8:41 AMdagster.yaml
file when exposing it with gRPC ?Phillip Marks
02/16/2023, 10:13 AMsnowflake_resource
on a SourceAsset? I'm trying to run a query on snowflake rather then using snowflake_pandas_io_manager as the dataset is too large to hold in local memory as a pandas dataframe
iris_harvest_data = SourceAsset(key="IRIS_DATASET", resource_defs=["snow_resource"])
not sure if i need to use resource_defs / how to config Mapping[str, ResourceDefinition]
@asset(required_resource_keys={"snow_resource"})
def example_downstream(IRIS_DATASET):
IRIS_DATASET.resources.snow_resource.execute_query("SELECT * LIMIT 150")
defs = Definitions(
assets=[another_dataset, iris_harvest_data, example_downstream],
resources={
"snow_pd_io_manager": snowflake_pandas_io_manager.configured(
{
"account" : os.getenv("SNOWFLAKE_ACCOUNT"),
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"role" : "ACCOUNTADMIN",
"warehouse": "COMPUTE_WH",
"database": "CORE",
"schema": "IAM"
}),
"snow_resource": snowflake_resource.configured(
{
"account" : os.getenv("SNOWFLAKE_ACCOUNT"),
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"role" : "ACCOUNTADMIN",
"warehouse": "COMPUTE_WH",
"database": "DAGSTER_TEST_ASSETS",
"schema": "PUBLIC"
})
},
)
jlaurent
02/16/2023, 10:28 AMloggers_by_deployment_name = {
"deploy": {"prod_logger": prod_logger},
"local": {"console_logger": colored_console_logger},
}
env_config = dotenv_values(".env")
deployment_name = env_config["DAGSTER_DEPLOYMENT"]
active_logger = loggers_by_deployment_name[deployment_name]
defs = Definitions(assets=[some_asset], loggers=active_logger)
I need to configure my prod_logger
. I have created a YAML config file, but I don't know how to provide it. I've read the documentation several time and I only saw the possibility to pass a config from the command line to a job, or from the dagit launchpad to a job.Stephen Bailey
02/16/2023, 1:57 PMhybrid_branch_deploy
action where it does not properly link to the branch deployment url.peay
02/16/2023, 2:06 PMOperation name: ShouldShowNux
Message: [Errno 13] Permission denied: '/opt/dagster/dagster_home/.nux'
Path: ["shouldShowNux"]
Locations: [{"line":2,"column":3}]
Dagster is deployed using the Helm chart, and has the following mount:
/opt/dagster/dagster_home/dagster.yaml from dagster-instance (rw,path="dagster.yaml")
It also seems [it would not be displayed anyway as it is only shown on localhost
](https://github.com/dagster-io/dagster/blob/master/js_modules/dagit/packages/app/src/NUX/CommunityNux.tsx#L27) -- but we get the GraphQL error anyway.
Hence, indeed, /opt/dagster/dagster_home/.nux
cannot be written. Is there a way to avoid this and disable the popup in the chart?Jonny Wray
02/16/2023, 2:46 PMairbyte_nasdaq_assets = build_airbyte_assets(
connection_id=AIRBYTE_NASDAQ_CONNECTION_ID,
destination_tables=["nasdaq_end_of_day_data"],
asset_key_prefix=["timescale_source"],
)
and this (and two others) are feeding to a downstream dependency which is a dbt asset. So very similar to the modern data stack example project.
When I use the UI to materialize the assets - either selectively or the full workflow - I get an error
2023-02-16 14:34:16 +0000 - dagster - DEBUG - __ASSET_JOB - 91cb0a21-d1d0-4a94-896f-da41410b8e52 - 12712 - airbyte_sync_b9e15 - ASSET_MATERIALIZATION - Materialized value timescale_source nasdaq_end-of-day-data.
2023-02-16 14:34:16 +0000 - dagster - INFO - __ASSET_JOB - 91cb0a21-d1d0-4a94-896f-da41410b8e52 - airbyte_sync_b9e15 - op 'airbyte_sync_b9e15' did not fire outputs {'nasdaq_end_of_day_data'}
2023-02-16 14:34:16 +0000 - dagster - ERROR - __ASSET_JOB - 91cb0a21-d1d0-4a94-896f-da41410b8e52 - 12712 - airbyte_sync_b9e15 - STEP_FAILURE - Execution of step "airbyte_sync_b9e15" failed.
dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "airbyte_sync_b9e15" did not return an output for non-optional output "nasdaq_end_of_day_data"
Stack Trace:
File "C:\Work\MTM Data Research Unified\data-engineering\data-pipelines\env\lib\site-packages\dagster\_core\execution\plan\execute_plan.py", line 265, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "C:\Work\MTM Data Research Unified\data-engineering\data-pipelines\env\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 382, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "C:\Work\MTM Data Research Unified\data-engineering\data-pipelines\env\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 175, in _step_output_error_checked_user_event_sequence
raise DagsterStepOutputNotFoundError(
I assume I have a configuration and/or dependency wire up error but I can't find it. Any pointers gratefully received.
I should also say - the Airbyte connection sync is triggered fine and the Dagster console logs contain a bunch of messages suggesting successful progress.Félix Tremblay
02/16/2023, 3:16 PMsqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140024476800832 and this is thread id 140024381171456.
(Background on this error at: <https://sqlalche.me/e/14/f405>)
Here's how to reproduce the error:
import concurrent.futures
from dagster import get_dagster_logger, graph, op
def my_function(x):
logger = get_dagster_logger()
<http://logger.info|logger.info>(f"Got {x=}")
return x + 1
@op
def my_op():
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(my_function, range(3)))
print(results)
return results
@graph
def my_graph() -> None:
my_op()
def main() -> None:
my_graph.execute_in_process()
if __name__ == "__main__":
main()
Rahul Dave
02/16/2023, 4:34 PMdagster._core.errors.DagsterInvalidDefinitionError: input manager with key 'model_input_manager' required by input 'encoders' of op 'transformer_op' was not provided. Please provide a <class 'dagster._core.storage.input_manager.IInputManagerDefinition'> to key 'model_input_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.input_manager.IInputManagerDefinition'>: ['io_manager', 'output_notebook_io_manager']
which comes from the following definitions:
class FixedPathInputManager(InputManager):
extension: str = ".joblib"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
def load_input(self, context):
<http://context.log.info|context.log.info>("in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return joblib.load(file)
@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
init_context: InitResourceContext,
) -> FixedPathInputManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return FixedPathInputManager(base_path=base_path)
and which is used as so:
local_train_transformer_job = transformer_graph.to_job(
name="train_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_training_data,
"data_type": train_type,
"encoder_file": encoder_file,
"model_input_manager": local_model_fixedpath_input_manager,
"lake_io_manager": local_pandas_parquet_io_manager,
}
)
According to the docs this definition of an InputManager should be enough? Am i being really stupid and missing something super-obvious?Fabien Gadet
02/16/2023, 5:28 PMUserWarning: Error loading repository location dagster_poc:dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'dbt' required by op 'run_dbt_05b60' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'dbt', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
?
I have no clue on how to fix that .Greg Burd
02/16/2023, 5:36 PMSaul Burgos
02/16/2023, 5:38 PMAaron Roberts
02/16/2023, 6:32 PMNicholas Pezolano
02/16/2023, 6:48 PMstefan hansan
02/16/2023, 6:50 PMPablo Beltran
02/16/2023, 7:04 PMNikolaj Galak
02/16/2023, 7:25 PMRahul Dave
02/16/2023, 7:33 PMio_manager_key
. Then I use it in another Out
in another op, and this is where things blow up. Here is my code:
target_extractor_op = define_dagstermill_op(
name="target_extractor_op",
notebook_path=file_relative_path(__file__, "../notebooks/target_extractor.ipynb"),
output_notebook_name="output_target_extractor",
outs={"target": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
ins={"df_train": In(pd.DataFrame)}
)
local_target_extractor_job = target_extractor_graph.to_job(
name="target_extractor_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"lake_io_manager": local_pandas_parquet_io_manager,
}
)
transformer_op = define_dagstermill_op(
name="transformer_op",
notebook_path=file_relative_path(__file__, "../notebooks/transform.ipynb"),
output_notebook_name="output_transform",
outs={"transformed_data": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
ins={"df": In(pd.DataFrame), "encoders": In(dict), "datatype": In(str)}
)
local_train_transformer_job = transformer_graph.to_job(
name="train_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_training_data,
"data_type": train_type,
"encoder_file": encoder_file,
"lake_io_manager": local_pandas_parquet_io_manager,
}
)
Rahul Dave
02/16/2023, 7:35 PMdagster._core.errors.DagsterInvalidDefinitionError: io manager with key 'lake_io_manager' required by output 'transformed_data' of op 'transformer_op'' was not provided. Please provide a <class 'dagster._core.storage.io_manager.IOManagerDefinition'> to key 'lake_io_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.io_manager.IOManagerDefinition'>: ['io_manager', 'output_notebook_io_manager']
Why would it work for the first (target-extractor) op/graph and not the second? All i want to do is to be able to write custom parquet files for some of my ops...is there some type based uniqueness to ops? something else?Wonjae Lee
02/16/2023, 7:42 PMChris Anderson
02/16/2023, 8:26 PM.collect
call eventually, but i'd like to have a limit of only one of those pipelines within a job running at a time. For a visual, in the example photo below i'd like the first mapping index of downstream ops to run and finish execution before the next mapping index is started. I tried per-op prioritization but it seems (correct me if i'm wrong) that it's a prioritization of launching the op, not actually the complete execution of it. Is there an easy way to offer concurrency limits on a per-mapping index baseline, or some ideas about possible method of going about this problem?Dong Kim
02/16/2023, 8:36 PMNicholas Pezolano
02/16/2023, 8:42 PM@asset(partitions_def=my_partitions_def)
def fed_holiday(context) -> bool:
return is_fed_holiday(context.partition_key)
Philippe Laflamme
02/16/2023, 10:05 PMUPathIOManager
than can provide pandas.DataFrame
or pyarrow.Table
This generally works fine and I can switch back and forth, but when I have a downstream asset that requires multiple upstream partitions, I get type mismatch error i.e.: DagsterTypeCheckDidNotPass
. The only way I can get it to work is to have the downstream asset use upstream: Dict[str, X]
where X
is the type annotation on the IO manager’s def load_from_path(...) -> X
method. I can’t seem to find the way to annotate my methods to make this dynamic.Tom Reilly
02/16/2023, 11:24 PMAdam Eury
02/17/2023, 2:34 AM