won
02/01/2022, 6:12 PMFrancis
02/02/2022, 3:08 AMStefan Adelbert
02/02/2022, 4:22 AMdagit
Are there any options for theming dagit
, e.g. changing the logo or theme colours?sourabh upadhye
02/02/2022, 8:12 AMRoel Hogervorst
02/02/2022, 9:05 AMOperation name: RootworkspaceQuery. psycopg2.OperationError server closed the connection unexpectedly.
There is more in this error but maybe this is enough? Is this something I can fix?Srini R
02/02/2022, 9:58 AMJay Sharma
02/02/2022, 3:08 PM@repository
def ingestion_pipeline_repository():
return get_pipelines() + [my_schedule]
@schedule(cron_schedule="0 22 * * *", pipeline_name="my_pipeline", execution_timezone="US/Eastern")
def my_schedule(context):
return {}
I'm following this tutorial for scheduling : https://docs.dagster.io/tutorial/advanced-tutorial/scheduling
My setup is: Dagster Version 0.13.1 and I'm running this through a docker image.
However I was previously on Dagster Version 0.11.15 and my code is currently legacy code (pipelines. solids, etc). Not sure if that has anything to do with the error.
Can someone tell me why I'm getting this name error?
Thanks for the help.Zach
02/02/2022, 3:48 PMbuild_op_context()
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:459: in build_op_context
return build_solid_context(
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:512: in build_solid_context
return UnboundSolidExecutionContext(
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:68: in __init__
self._instance = self._instance_cm.__enter__() # pylint: disable=no-member
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py:119: in __enter__
return next(self.gen)
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/api.py:333: in ephemeral_instance_if_missing
with DagsterInstance.ephemeral() as ephemeral_instance:
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/instance/__init__.py:363: in ephemeral
tempdir = DagsterInstance.temp_storage()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@staticmethod
def temp_storage() -> str:
> from dagster.core.test_utils import environ
E ImportError: cannot import name 'environ' from 'dagster.core.test_utils' (/Users/zachary.romer/Documents/empirico/etxpipelines/dagster_0.13.17/lib/python3.9/site-packages/dagster/core/test_utils.py)
../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/instance/__init__.py:488: ImportError
def test_nominal_case(self, mock_file):
context = build_op_context(op_config={'bucket': 'test-bucket',
'key': 'test-key'})
Sorry, turns out I was patching something low-level on the test and didn't realize it was interfering with the dagster internals!Javier Llorente Mañas
02/02/2022, 4:15 PMrun_config
?Also I am trying to look at some tutorials in this are apart from this one Chris Nogradi
02/02/2022, 7:23 PMJohnny Bravo
02/02/2022, 9:11 PM@repository
def my_repo():
return {"jobs": {"my_job": lambda: my_job}}
and by reading the source, I don't see why it would not fail https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/core/definitions/repository_definition.py#L546-L549
obviously my_job
is an instance of JobDefinition
but not a lambda, by my understanding.
without wrapping my job will load the repo as expected, however it will execute my job's code for some reasonMykola Palamarchuk
02/02/2022, 9:29 PMMykola Palamarchuk
02/02/2022, 9:39 PMChris Nogradi
02/02/2022, 11:05 PMStefan Adelbert
02/03/2022, 4:05 AMdef storage_bucket(dependent_resource_name):
@resource(
required_resource_keys={dependent_resource_name},
)
def _storage_bucket(init_context):
credentials = init_context.resources[dependent_resource_name]
return StorageBucket(credentials)
return _storage_bucket
The problem is in the line credentials = init_context.resources[dependent_resource_name]
which is giving me the error TypeError: tuple indices must be integers or slices, not str
. I'm aware that init_context.resources
is a _ScopeResources
. I also see that _ScopedResource
implements __getitem__
, which is why I assumed this would work.
I know that I could call _ScopedResources._asdict()
and the get the resource by name.
Can anyone suggest the D way to do this?Alex Service
02/03/2022, 4:39 AMGCPFileManager.read_data
to retrieve a compressed tarfile (blah.tar.gz
) from a bucket. It seems to retrieve the file, but truncates the last few hundred bytes and therefore fails to decompress. I verified it’s not using a bad cached version and if I download the file through other means (gsutil, gcp UI, etc), it works as expected. Is this a known issue, or am I just missing something?Scobie Smith
02/03/2022, 5:07 AMRoel Hogervorst
02/03/2022, 10:10 AMOperation name: RootworkspaceQuery. psycopg2.OperationError server closed the connection unexpectedly.
https://dagster.slack.com/archives/C01U954MEER/p1643792705600429Dominik Liebler
02/03/2022, 2:19 PMCarlos Sanoja
02/03/2022, 5:24 PMGeorge Pearse
02/03/2022, 5:30 PMChris Nogradi
02/03/2022, 5:38 PMIgor
02/03/2022, 5:44 PM@op(ins={'front': In(str)}, out=Out(None))
def start(context, front):
<http://context.log.info|context.log.info>(f'{front}')
yield Output(None)
@job
def data_parser():
start(1)
start(1)
job_data_parser_schedule = ScheduleDefinition(cron_schedule='00 00 * * *', job=data_parser,
execution_timezone='Europe/London')
@repository
def repository_parser():
return [job_data_parser_schedule]
error
dagster.core.errors.DagsterInvalidDefinitionError: In @job data_parser, received invalid type <class 'str'> for input "front" (at position 0) in op invocation "start". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
Igor
02/03/2022, 7:02 PMvolumes:
- ./files:/work
Does not workElizabeth
02/03/2022, 7:29 PMget_runs
based on run_key
in a sensor? It doesn’t appear to be an option for PipelineRunsFilter
.Evan Arnold
02/03/2022, 7:51 PMjob
and op
imports.
Currently, we have a hand-rolled ETL that is super object oriented. So we will have things like the following:
class FolderExtractor:
def __init__(
self, path_to_input: str
) -> None:
# some code
def run(self) -> list[str]:
# other code
class DataFrameBuilder:
def __init__(self, files: list[str]) -> None:
# code
def run(self) -> pd.DataFrame:
# code
Which allows us to stack the objects:
DataFrameBuilder(FolderExtractor(inp).run()).run()
We chose this pattern because we have pretty complicated internal logic that we wanted to pack away into private methods.
How would one convert this to `op`s? I am kind of thinking you would just create something like:
@op
def folder_extractor:
@op
def data_frame_builder
But then where do the private methods live? In the same file? Or do you create an object that you hid inside of the op
? Or...
Very welcome to any & all feedback. Perhaps also helpful to say: I've got a decade+ of coding experience, but I am still fairly new to python.Evan Arnold
02/03/2022, 9:38 PMProgressBar
? We have a pretty slow map_partions/compute thing going on, and the progress bar has been pretty key to debugging (unfortunately)Anoop Sharma
02/03/2022, 10:06 PMStefan Adelbert
02/03/2022, 10:14 PMdict
to the config
parameter of @job
and the docs (https://docs.dagster.io/_apidocs/jobs#dagster.job) state
If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.I can tell that the config I've provided is being used when the job runs, but I'm not seeing that config in the Dagit playground though... Am I missing something?
Evan Arnold
02/03/2022, 11:07 PM