Andrea Giardini
11/09/2021, 3:42 PMop
for each one of the elements of the list using this code:
@op(
output_defs=[DynamicOutputDefinition(str)]
)
def generate_subtasks(context, nums: List[str]):
<http://context.log.info|context.log.info>(str(nums))
for idx, num in enumerate(nums):
yield DynamicOutput(num, mapping_key=f'subtask_{idx}')
@graph
def data_inventory():
gcs_lists = create_list()
generate_subtasks(gcs_lists).map(process_list)
The issue is that all the generated process_list
op
are started at the same time. Is there a way to specify concurrency for the op
so that only a certain number of them are running at the same time ?Andrew Brown
11/09/2021, 5:20 PMnbconvert.preprocessors.execute.DeadKernelError: Kernel died
error in a Dagstermill job? More background: we ran this job successfully on a local machine, then the kernel dies within 20 seconds on our development EC2 instance in Docker (the same Docker config is used on our local machines as EC2). The only thing that changed was changing `pyarrow`’s version to 3.0.0, since that was a requirement for using Dagster’s Pandas dataframe type checks. So my hunch is it’s related to pyarrow
, but I’m not sure how to prove it, since it’s such a non-specific error.Marcel M
11/09/2021, 10:01 PMHebo Yang
11/10/2021, 4:50 AMSimon Späti
11/10/2021, 11:01 AMdagster_type_loader
and a custom DagsterType with usable_as_dagster_type
. I succeeded to load a custom DagsterType and load with a dagster_loader function. But now I'd like to have a list of that custom DagsterType. That does not seem to work, I get: dagster.check.ParameterCheckError: Param "inner_type" is not a DagsterType. Got <class 'graphs_jobs.SqlDict'> which is type <class 'type'>.
Any idea I can work around that or any suggestion I can achieve it differently? I tried many different options, also with RootInputManager and also with the LessSimpleDataFrame example. The code what I want to achieve and what worked in the Thread.dixel
11/10/2021, 11:02 AMop
? I am looking for ways to simplify this statement:
@job(
name="job1",
config={
"ops": {
"op1": {"config": conf},
"op2": {"config": conf},
"op3": {"config": conf},
"op4": {"config": conf},
}
},
)
def job1():
...
Bing
11/10/2021, 12:03 PMdask_executor
with this guide but always end up with two invalid config errrors: _Error 1: Received unexpected config entry "in_process" at path root:execution_ and Error 2: Missing required config entry "config" at path root:execution.
I have tried passing in the config via both the dagit interface and the job attribute directly, but neither works. The dask cluster and the azure storage are working fine. Wonder if anyone else having the same issue?Daniel Michaelis
11/10/2021, 12:04 PMFlavien
11/10/2021, 2:10 PMfrom dagster import GraphDefinition, In, Nothing, op, repository
@op(ins={"deps": In(Nothing)})
def say_hello(context):
<http://context.log.info|context.log.info>("Hello!")
def say_hello_graph() -> GraphDefinition:
return GraphDefinition(
"say_hello",
node_defs=[
say_hello.alias("say_hello_1"),
say_hello.alias("say_hello_2")
],
)
say_hello_job = say_hello_graph().to_job()
@repository
def say_hello_repository():
return [say_hello_job]
But I'm getting this error:
AttributeError: 'PendingNodeInvocation' object has no attribute '__name__'
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/grpc/server.py", line 205, in __init__
self._repository_symbols_and_code_pointers.load()
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/grpc/server.py", line 90, in load
self._loadable_repository_symbols = load_loadable_repository_symbols(
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/grpc/server.py", line 108, in load_loadable_repository_symbols
loadable_targets = get_loadable_targets(
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/grpc/utils.py", line 25, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 17, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 123, in load_python_file
module = import_module_from_path(module_name, python_file)
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/home/flavi/playground/dagster/sandbox.py", line 18, in <module>
say_hello_job = say_hello_graph().to_job()
File "/home/flavi/playground/dagster/sandbox.py", line 10, in say_hello_graph
return GraphDefinition(
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/core/definitions/graph.py", line 138, in __init__
self._node_defs = _check_node_defs_arg(name, node_defs)
File "/home/flavi/playground/dagster/.venv/lib/python3.9/site-packages/dagster/core/definitions/graph.py", line 84, in _check_node_defs_arg
name=graph_name, func=node_def.__name__
I'm I doing something wrong ?Andy Chen
11/10/2021, 3:52 PMwith Pool(6) as pool:
results = pool.starmap(
parallelizable_function,
list(zip(items, repeat(context))))
pool.close()
pool.join()
def parallelziable_function(item, context):
<http://context.log.info|context.log.info>(f"currently working on {item}")
Mark Kudryk
11/10/2021, 5:59 PM*/5 * * * *
. When I turn the schedule on, every 5 minutes the pipeline is kicked off as expected, and when I turn it off jobs are not initiated, also as expected. What I have observed is after turning the schedule off and then after a period of time I turn it on again, for example at 10:48 am, more pipeline runs are being initiated than expected. A run at 10:50am is kicked off (expected), but runs for ticks at 10:35, 10:40 and 10:45 am are also initiated, times at which the scheduler was off.
Is this correct?Marc Keeling
11/10/2021, 6:12 PMdagster.core.errors.DagsterInvalidInvocationError: Compute function of op 'cereal_expectations' has context argument, but no context was provided when invoking.
Here is the code:
from dagster_pandas.validation import PandasColumn
from dagster import job, op, file_relative_path, In, Out
from dagster_ge.factory import ge_data_context, ge_validation_op_factory_v3
from pandas import read_csv
from dagster_pandas import create_dagster_pandas_dataframe_type, DataFrame
cereal_expectations = ge_validation_op_factory_v3(
name='cereal_expectations',
datasource_name='cereal_data',
data_connector_name='default_runtime_data_connector_name',
data_asset_name='cereal_csv',
suite_name='cereal.csv.warning',
batch_identifiers={
'default_identifer_name':'default_identifier'
}
)
@op(
config_schema={'url':str,'store_dir':str},
out={'df':Out(DataFrame)})
def download_csv(context):
df = read_csv(context.op_config['url'])
df.to_csv(context.op_config['store_dir'],index=False)
<http://context.log.info|context.log.info>(f"Found {len(df)} cereals")
return df
@op(
ins={'df':In(DataFrame)},
out={'df':Out(DataFrame)},
)
def validate_cereal_expectations(df):
if cereal_expectations(df)['success']:
# <http://context.log.info|context.log.info>(f'Validation passed for Cereals')
return df
else:
raise ValueError
@job(
resource_defs={'ge_data_context': ge_data_context},
config = {
'resources': {
'ge_data_context': {
'config':{'ge_root_dir': file_relative_path(__file__, './great_expectations')}
}
},
'ops':{
'download_csv':{
'config':{
'store_dir':file_relative_path(__file__,'../../data/cereal.csv'),
'url':'<https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv>'
}
}
}
}
)
def diamond():
cereals = download_csv()
validate_cereal_expectations(cereals)
Thoughts?Egor -
11/10/2021, 7:19 PMCarter
11/10/2021, 8:46 PMdef test_sensor_config():
context = build_sensor_context(cursor="{}")
for run_request in my_sensor(context):
assert validate_run_config(my_job, run_request.run_config)
but this raises
dagster.core.errors.DagsterInvariantViolationError: Attempted to initialize dagster instance, but no instance reference was provided.
I assume I need to pass in some type of dagster instance to build_sensor_context but I'm not sure what the best way to create one is in this context - is there a conventional way to create some kind of 'toy' dagster instance object for testing purposes?Marc Keeling
11/10/2021, 10:41 PMcereal_expectations = ge_validation_op_factory_v3(
name='cereal_expectations',
datasource_name='data_dir',
data_connector_name='default_runtime_data_connector_name',
data_asset_name='data_asset_name',
suite_name='cereal.csv.warning',
batch_identifiers={
'default_identifer_name':'default_identifier'
}
)
When I run it I get this error great_expectations.exceptions.exceptions.DataConnectorError: RuntimeDataConnector "default_runtime_data_connector_name" was invoked with one or more batch identifiers that do not appear among the configured batch identifiers.
Does anyone have a code snippet that can show me how to properly configure the ge v3 api?
Alternatively attached is my great_expectations.yaml file. What values need to be entered to connect them properly?Francis
11/11/2021, 1:35 PM$DAGSTER_HOME/history
and $DAGSTER_HOME/storage
older than say 1 week? We keep maxxing out the storage of our AWS instance and have to clean up those folders and restart dagster as a resultKyle Downey
11/11/2021, 8:26 PMArun Kumar
11/11/2021, 8:58 PMexecution_fn
of a Dagster Schedule?Robert Chandler
11/12/2021, 12:53 PMQumber Ali
11/12/2021, 12:57 PMrun_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: "Reports"
limit: 2
job details
@job(
resource_defs={"report_details": report_details.get_details},
tags={"type": "Reports"},
)
jay
11/12/2021, 7:20 PMdagster.core.errors.DagsterInvalidDefinitionError: resource key 'local_fs' is required by op 'get_data', but is not provided. Provide a resource for key 'local_fs', or change 'local_fs' to one of the provided resource keys: ['io_manager'].
It works if I directly run the file with python on CLI.
Here is my code:
@resource
def my_local_fs(ctx: InitResourceContext):
yield LocalFS()
@op(input_defs=[name="get_data", required_resource_keys={"local_fs"})
def get_data(context) -> Tuple[str, pd.DataFrame]:
pass
get_my_data = get_data.alias("get_my_data")
@graph
def my_graph():
dm = get_my_data()
local_resource_defs = {
"io_manager": fs_io_manager,
"local_fs": my_local_fs
}
def my_graph_job():
return my_graph.to_job(config=my_config(), resource_defs=local_resource_defs)
if __name__ == "__main__":
result = execute_pipeline(
reconstructable(my_graph_job),
instance=DagsterInstance.local_temp(),
I am on Dagster version 0.13.4Matthias Queitsch
11/12/2021, 7:31 PMEd R
11/13/2021, 1:18 AM$ curl <redacted>:3000
curl: (7) Failed connect to <redacted>:3000; Connection refused
$ curl localhost:3000
<!doctype html><html lang="en"><head><meta http-equiv="Content-Security-Policy" content="base-uri 'self'; object-src 'none'; script-src 'self' 'nonce-7e8a5890faab49f093326c3048a99ea7'; style-src '.........</script></body></html>
Ed R
11/13/2021, 1:42 AMMartin Carlsson
11/13/2021, 3:04 PMdagster.core.errors.DagsterInvalidDefinitionError: op 'log_single_measurement_serie' cannot be downstream of more than one dynamic output. It is downstream of both "yield_installations:result" and "yield_measurement_series:result"
What I’m trying to do:
I’m getting data from an API.
First, I get a list of Installations
.
And for each installation, I get MeasurementSeries
.
So I create a dynamic op for Installations
that yields installations - and for each installation I want to create another dynamic op that yields measurement series … and then I want to do some logic for each measurement series.
It is like a loop in a loop.
It doesn’t look like Dagster allow that. So what do I do?David Farnan-Williams
11/13/2021, 8:50 PMgraph_result = example_dataframe_graph.execute_in_process(
resources=resource_definitions,
)
I get this exception:
if len(self.input_defs) > 0:
> raise DagsterInvariantViolationError(
"Graphs with inputs cannot be used with execute_in_process at this time."
)
E dagster.core.errors.DagsterInvariantViolationError: Graphs with inputs cannot be used with execute_in_process at this time.
If I make a job out of the graph:
job = example_dataframe_graph_job(resource_defs=dagster_resource_definitions)
job_result = job.execute_in_process()
I get this exception:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\definitions\job.py", line 151, in execute_in_process
return core_execute_in_process(
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\execute_in_process.py", line 33, in core_execute_in_process
execution_plan = create_execution_plan(
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\api.py", line 744, in create_execution_plan
return ExecutionPlan.build(
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\plan\plan.py", line 909, in build
return plan_builder.build()
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\plan\plan.py", line 169, in build
self._build_from_sorted_solids(
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\plan\plan.py", line 240, in _build_from_sorted_solids
step_input_source = get_step_input_source(
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\core\execution\plan\plan.py", line 502, in get_step_input_source
check.failed("unexpected error in composition descent during plan building")
File "c:\Users\dfarnan\source\repos\ProductionPrediction\.conda\lib\site-packages\dagster\check\__init__.py", line 119, in failed
raise CheckError("Failure condition: {desc}".format(desc=desc))
dagster.check.CheckError: Failure condition: unexpected error in composition descent during plan building
The work around I currently use is to wrap my outer graph in another graph.
@graph
def example_graph():
return example_dataframe_graph()
def example_job_prod() -> PipelineDefinition:
return example_graph.to_job(resource_defs=EXAMPLE_RESOURCES_PROD)
Doing this lets Dagster solve the unbound inputs with the root_input_manager for the inner graph before it gets to the outer graph where it doesn't like input_defs. Thus far, when I am testing a graph or creating a job out of it to add to my repository, I use this workaround and it has worked fine.
The issue I'm bumping into is that dagit/dagster-cloud only sees the outer layer of the graph for the graph-query-input selection. So since I'm having to wrap my graph it will only allow me to select the outer graph for execution so I can only execute the full graph.
If the job fails at some point during the run it will allow me to re-execute from that failure, which is great. But the step selection part doesn't seem to work with my graphs. No matter what step I tell it to execute from, it will execute the entire graph.
At this point I'm working on testing/tuning kubernetes execution of the graph in dagster-cloud, but I can only really execute the entire graph. The job begins to execute and gets a step or 2 into the execution and then fails with a somewhat esoteric error that I can't reproduce locally. I'm pretty noob at kubernetes so it's not surprising my full scale graph is not successfully executing right of the bat.
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 14: dagster_cloud.storage.errors.GraphQLStorageError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
I was hoping to be able to execute smaller portions of my graph to get something easier to test without having to make a cartoon graph, but the workaround and path I've gone down has locked me into a bit of a rut. I'm not 100% sure I've got everything setup exactly as it's intended.
I feel like the path I've gone down is a bit fragile, and since we're so close to having a deployed functioning beta I'm open to re-engineering the top level of our dagster warehouse to get something a bit more aligned with vision and stable. I'm hoping to get to a place where I can run our dataframe graphs individually to test/update in kubernetes dagster-cloud setup without having to build several cartoon test ops or having to run the entire graph.
Possible paths I've considered:
1. Create a job out of each of the 5-10 sub graphs that generate dataframes by adding an op to load the foreign dataframe into each graph.
2. Wrap dataframe graphs using experimental software-defined assets
3. Create cartoon ops specifically for testing kubernetes/dagster-cloud with azureml dataset io_manager, kubernetes execution, and our blob storage io_manager.
4. Setup local dev kubernetes (currently I'm building this directly against our dev AKS cluster)
I may end up doing multiple of these, but I'm open to any suggestions. I'm going to work on re-engineering the graphs around the experimental software defined asset path since it is essentially a perfect fit for what we've done with our graphs.Pavel Vasek
11/14/2021, 4:36 PMDockerRunLauncher
but could not figure out how this works.Sara
11/14/2021, 6:45 PMQumber Ali
11/15/2021, 6:25 AMMarcel M
11/15/2021, 4:33 PMimport rollbar
rollbar.init('<my token here>')
I thought of putting this code in repository.py. But where should I put the config (the token) then? Not in dagster.yaml/resources.yaml/workspace.yaml. Is there another config file for general/global config?
OTOH I thought of putting the code in a resource, however the code is not directly related to ops or jobs. So how would one access the resource then?
Thanks for shedding light on this.