Thomas Yopes
06/01/2021, 11:52 AMNothing
dependency using DependencyDefinition
that connects two composite solids?
Use case I'm working with:
Composite solid 1 runs N solids, composite solid 2 runs M solids, there's no obvious data dependency (it's lots of db / table setup work). I'm building the dependency dictionary object, but can't introduce an argument to the second composite solid that's Nothing
(plus, even if I introduce an argument, I can't run the pipeline as this argument has to be used in the solids, which I would like to avoid) -- but in the documentation I seem to need an input value.Raghu
06/01/2021, 2:46 PMDan Corbiani
06/01/2021, 5:46 PMJordan W
06/01/2021, 6:26 PMasync
where possible. I am having a little issue trying to get a basic working example to execute asynchronously. As of now, the basic example solids execute one after another. I saw in the docs that execute_pipeline_iterator
might have to be used. I was wondering if there was any async execution for dagit/dagster cli (apart from execute_pipeline_iterator
) and if I could get some help on my example. Appreciate any and all help.Rei Tamai
06/02/2021, 8:21 AMsk4la
06/02/2021, 12:23 PMdagster api grpc
) ?
I currently load my repositories locally from multiple packages using the - python_package: ...
stanza in workspace.yaml
; but it does not seem possible using the gRPC server, as it does not make use of the local workspace.yaml
file. Any ideas?Thomas Yopes
06/02/2021, 12:24 PMDan Corbiani
06/02/2021, 12:51 PMAndrew Herbst
06/02/2021, 3:36 PMsolid1 -> Nothing, solid2 -> String, solid3(solid1(), solid2())
When I try this I get an error about unmapped parameters, I have a gist that illustrates this issue if that would help.Dylan Bienstock
06/02/2021, 5:03 PMclient = DagsterGraphQLClient("localhost", port_number=3000)
and it is working when I run dagit locally. However, when i deploy it to my dagster k8 instance it is not able to connect. Is there a different port I should be using? Or do I need to open something up to allow it to communicate?John Conwell
06/02/2021, 5:22 PM@pipeline(mode_defs=[cluster_mode_def])
def build_models():
# determine if we're building or copying the training data set from production
if config['BUILD_DATA_SET']:
training_data_set = cmds.build_training_data_set(cmds.training_data_config())
else:
training_data_set = cmds.copy_training_data_set()
# build vectors for each feature family, build full vector space, then build the model
cmds.build_model(
cmds.build_feature_space(
cmds.build_feature_vectors(
training_data_set
)
)
)
Based on some config state I’d want to either build my training dataset or just copy it from a production environment. But since pipelines don’t execute, just build the solid DAG structure and don’t have access to any configuration information, how would I do this?Eduardo Santizo
06/02/2021, 6:12 PMTom Han
06/02/2021, 6:30 PMdagster.check.CheckError: Invariant failed. Description: Attempted to deserialize class "ScheduleExecutionData" which is not in the whitelist.
File "/usr/local/lib/python3.7/site-packages/dagster/scheduler/scheduler.py", line 212, in launch_scheduled_runs_for_schedule
debug_crash_flags,
File "/usr/local/lib/python3.7/site-packages/dagster/scheduler/scheduler.py", line 258, in _schedule_runs_at_time
scheduled_execution_time=schedule_time,
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/repository_location.py", line 687, in get_external_schedule_execution_data
scheduled_execution_time,
File "/usr/local/lib/python3.7/site-packages/dagster/api/snapshot_schedule.py", line 57, in sync_get_external_schedule_execution_data_grpc
else None,
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 278, in external_schedule_execution
"".join([chunk.serialized_chunk for chunk in chunks])
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 242, in deserialize_json_to_dagster_namedtuple
check.str_param(json_str, "json_str"), whitelist_map=_WHITELIST_MAP
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 252, in _deserialize_json_to_dagster_namedtuple
return _unpack_value(seven.json.loads(json_str), whitelist_map=whitelist_map)
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 278, in _unpack_value
f'Attempted to deserialize class "{klass_name}" which is not in the whitelist.',
File "/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py", line 167, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
My running environment is standard k8s on the cloud, and the schedule code is following:
@daily_schedule(
pipeline_name='pipeline_daily_partition',
execution_timezone="Asia/Shanghai",
execution_time=datetime.time(hour=23, minute=41),
start_date=datetime.datetime(2010, 5, 27),
partition_days_offset=0,
mode=prd_partition_preset.mode,
)
def daily_schedule_partition(date):
def nested_update(d: dict, target, value):
for k, v in d.copy().items():
if isinstance(v, dict):
nested_update(v, target, value)
else:
if d.get(target):
d[target] = value
modified_run_config = prd_partition_preset.run_config.copy()
nested_update(modified_run_config, 'date', date)
return modified_run_config
Jordan W
06/02/2021, 7:47 PMdagster.mem_io_manager
when running asynchronous solids? I seem to be getting an error when using this as my io manager. When I use the other default fs_io_manager
it does work. Thanks in advanced!Marco
06/03/2021, 8:32 AMThomas Yopes
06/03/2021, 11:53 AMevents_for_asset_key
? Can't seem to find it in the examples anywhere.
I'm working with the example code in the sensor as follows:
record_id, event = events[0]
But I'm wondering if there's a way to get the metadata I attached to the asset.wangm23456
06/03/2021, 1:21 PMwangm23456
06/03/2021, 1:21 PMimport random
from dagster import Output, OutputDefinition, pipeline, solid, composite_solid
@solid(
output_defs=[
OutputDefinition(name="branch_1", is_required=False),
OutputDefinition(name="branch_2", is_required=False),
]
)
def branching_solid_test(context):
num = 0
<http://context.log.info|context.log.info>(str(num))
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid
def branch_1_solid(_input):
return 1
@solid
def branch_2_solid(_input):
return 2
@solid
def s1_solid(_input):
return 1
@solid
def s2_solid(_input):
return 2
@composite_solid
def solid1(_input):
s = branch_1_solid(_input)
s = s1_solid(_input)
return s
@composite_solid
def solid2(_input):
s = branch_2_solid(_input)
s = s2_solid(_input)
return s
@solid
def final(_s):
return _s
@pipeline
def branching_pipeline():
branch_1, branch_2 = branching_solid_test()
s = solid1(branch_1)
s = solid2(branch_2)
final(s)
wangm23456
06/03/2021, 1:23 PMwangm23456
06/03/2021, 1:26 PMAkshay Verma
06/03/2021, 3:04 PMEthan Brown
06/03/2021, 4:01 PMBrian Seo
06/03/2021, 5:42 PMMatyas Tamas
06/03/2021, 8:00 PMArun Kumar
06/03/2021, 8:26 PMAndrew Herbst
06/03/2021, 9:31 PMDavid
06/04/2021, 7:04 AMraise RetryRequested(max_retries=2)
, is there a way to determine within the solid that it is running as a retry, and how many times it has already already been retried? Thanks!Thomas Yopes
06/04/2021, 9:13 AMexecute_pipeline
from a solid in another pipeline? I've tried lots of different ways to get Dagster to run a schedule for a pipeline that needs to be re-generated fresh when the scheduler starts, but haven't been able to make any progress (lots of config, timing, and invariant errors -- tried sensors with no luck, assets, etc). For the time being, I'm simply going to have a helper pipeline that calls the code required to generate the dynamic pipeline then calls execute_pipeline
and it would be great if this dynamic pipeline appeared in the runs tab. Let me know! or let me know if you're someone who's managed to get Docker + Dagster working with dynamic pipeline generation (note: I don't mean the dynamic output but defining the entire pipeline up front before it's executed).Denis Rakhimov
06/04/2021, 10:32 AMDarren Haken
06/04/2021, 2:57 PM