Abhishek Agrawal
06/23/2023, 6:24 AMyuhan
06/23/2023, 6:59 PMAbhishek Agrawal
06/26/2023, 10:39 AMAbhishek Agrawal
06/26/2023, 10:49 AMsubmit_job_execution
function? Basically create AssetJobs on the fly..?yuhan
06/26/2023, 2:45 PMAbhishek Agrawal
06/26/2023, 4:32 PMop_selection
argument, I'm getting an error. I should be able to select assets using the op_selection
field, right?Abhishek Agrawal
06/26/2023, 5:05 PM{'executionParams': {'selector': {'repositoryLocationName': 'pipeline', 'repositoryName': '__repository__', 'pipelineName': 'datamart_load', 'solidSelection': <dagster._core.definitions.asset_selection.KeysAssetSelection object at 0x3e2721938f10>}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}
Abhishek Agrawal
06/26/2023, 5:09 PMDEFAULT 2023-06-26T16:58:10.197598Z "text": "*Error Details/Message:*\nFailed to trigger the job. Error message: Exception occured during execution of query
DEFAULT 2023-06-26T16:58:10.197602Z mutation($executionParams: ExecutionParams!) {
DEFAULT 2023-06-26T16:58:10.197606Z launchPipelineExecution(executionParams: $executionParams) {
DEFAULT 2023-06-26T16:58:10.197609Z __typename
DEFAULT 2023-06-26T16:58:10.197614Z ... on InvalidStepError {
DEFAULT 2023-06-26T16:58:10.197617Z invalidStepKey
DEFAULT 2023-06-26T16:58:10.197621Z }
DEFAULT 2023-06-26T16:58:10.197625Z ... on InvalidOutputError {
DEFAULT 2023-06-26T16:58:10.197628Z stepKey
DEFAULT 2023-06-26T16:58:10.197631Z invalidOutputName
DEFAULT 2023-06-26T16:58:10.197635Z }
DEFAULT 2023-06-26T16:58:10.197639Z ... on LaunchPipelineRunSuccess {
DEFAULT 2023-06-26T16:58:10.197642Z run {
DEFAULT 2023-06-26T16:58:10.197645Z runId
DEFAULT 2023-06-26T16:58:10.197648Z }
DEFAULT 2023-06-26T16:58:10.197651Z }
DEFAULT 2023-06-26T16:58:10.197654Z ... on ConflictingExecutionParamsError {
DEFAULT 2023-06-26T16:58:10.197675Z message
DEFAULT 2023-06-26T16:58:10.197678Z }
DEFAULT 2023-06-26T16:58:10.197682Z ... on PresetNotFoundError {
DEFAULT 2023-06-26T16:58:10.197685Z message
DEFAULT 2023-06-26T16:58:10.197687Z }
DEFAULT 2023-06-26T16:58:10.197691Z ... on PipelineRunConflict {
DEFAULT 2023-06-26T16:58:10.197695Z message
DEFAULT 2023-06-26T16:58:10.197697Z }
DEFAULT 2023-06-26T16:58:10.197701Z ... on PipelineConfigValidationInvalid {
DEFAULT 2023-06-26T16:58:10.197704Z errors {
DEFAULT 2023-06-26T16:58:10.197707Z __typename
DEFAULT 2023-06-26T16:58:10.197710Z message
DEFAULT 2023-06-26T16:58:10.197713Z path
DEFAULT 2023-06-26T16:58:10.197716Z reason
DEFAULT 2023-06-26T16:58:10.197718Z }
DEFAULT 2023-06-26T16:58:10.197721Z }
DEFAULT 2023-06-26T16:58:10.197724Z ... on PipelineNotFoundError {
DEFAULT 2023-06-26T16:58:10.197727Z message
DEFAULT 2023-06-26T16:58:10.197730Z }
DEFAULT 2023-06-26T16:58:10.197733Z ... on PythonError {
DEFAULT 2023-06-26T16:58:10.197736Z message
DEFAULT 2023-06-26T16:58:10.197740Z }
DEFAULT 2023-06-26T16:58:10.197743Z }
DEFAULT 2023-06-26T16:58:10.197746Z }
DEFAULT 2023-06-26T16:58:10.197749Z with variables
DEFAULT 2023-06-26T16:58:10.197756Z {'executionParams': {'selector': {'repositoryLocationName': 'data-os@data-pipeline', 'repositoryName': '__repository__', 'pipelineName': 'abi_dev_3_datamart_load', 'solidSelection': <dagster._core.definitions.asset_selection.KeysAssetSelection object at 0x3e2721938f10>}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}
DEFAULT 2023-06-26T16:58:10.197758Z "
Abhishek Agrawal
06/26/2023, 5:09 PMAbhishek Agrawal
06/26/2023, 5:11 PMsubmit_job_execution
method?yuhan
06/26/2023, 6:51 PMAbhishek Agrawal
06/27/2023, 12:32 AMop_selection
argument and got a different error -
{
"textPayload": "INFO:gql.transport.requests:>>> {\"query\": \"mutation ($executionParams: ExecutionParams!) {\\n launchPipelineExecution(executionParams: $executionParams) {\\n __typename\\n ... on InvalidStepError {\\n invalidStepKey\\n }\\n ... on InvalidOutputError {\\n stepKey\\n invalidOutputName\\n }\\n ... on LaunchPipelineRunSuccess {\\n run {\\n runId\\n }\\n }\\n ... on ConflictingExecutionParamsError {\\n message\\n }\\n ... on PresetNotFoundError {\\n message\\n }\\n ... on PipelineRunConflict {\\n message\\n }\\n ... on PipelineConfigValidationInvalid {\\n errors {\\n __typename\\n message\\n path\\n reason\\n }\\n }\\n ... on PipelineNotFoundError {\\n message\\n }\\n ... on PythonError {\\n message\\n }\\n }\\n}\", \"variables\": {\"executionParams\": {\"selector\": {\"repositoryLocationName\": \"data-os@data-pipeline\", \"repositoryName\": \"__repository__\", \"pipelineName\": \"abi_dev_3_datamart_load\", \"solidSelection\": \"abi_dev_3_dm_stg_paid_media_mappings\"}, \"runConfigData\": {}, \"mode\": \"default\", \"executionMetadata\": {\"tags\": [{\"key\": \"user\", \"value\": \"Middleware\"}]}}}}"
In the snapshot, you can see the stack trace. I am passing the job name as string and it is causing a KeyError in Dagster libraries..yuhan
06/27/2023, 9:42 PMsubmit_job_execution
takes op_selection
as Optional[Sequence[str]]
Abhishek Agrawal
06/27/2023, 9:51 PM[AssetKey("<key>")]
or ["<key>"]
. As in, list of AssetKey or just a string list but it didn't like it.Abhishek Agrawal
06/27/2023, 9:55 PM"text": "*Error Details/Message:*\nFailed to trigger the job. Error message: Exception occured during execution of query \n\nmutation($executionParams: ExecutionParams!) {\n launchPipelineExecution(executionParams: $executionParams) {\n __typename\n\n ... on InvalidStepError {\n invalidStepKey\n }\n ... on InvalidOutputError {\n stepKey\n invalidOutputName\n }\n ... on LaunchPipelineRunSuccess {\n run {\n runId\n }\n }\n ... on ConflictingExecutionParamsError {\n message\n }\n ... on PresetNotFoundError {\n message\n }\n ... on PipelineRunConflict {\n message\n }\n ... on PipelineConfigValidationInvalid {\n errors {\n __typename\n message\n path\n reason\n }\n }\n ... on PipelineNotFoundError {\n message\n }\n ... on PythonError {\n message\n }\n }\n}\n\n with variables \n{'executionParams': {'selector': {'repositoryLocationName': 'data-os@data-pipeline', 'repositoryName': '__repository__', 'pipelineName': 'abi_dev_3_datamart_load', 'solidSelection': [AssetKey(['abi_dev_3_dm_stg_paid_media_mappings'])]}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}\n""
Abhishek Agrawal
06/27/2023, 10:09 PMop_selection
.owen
06/28/2023, 4:49 PM"assetSelection": [<http://key.to|key.to>_graphql_input() for key in asset_selection]
into the "selector" of your executionParamsAbhishek Agrawal
09/13/2023, 12:13 AMpipeline_utils_client
is just an object of DagsterGraphQLClient
.
pipeline_utils_client = pipeline_utils.PipelineUtilsClass()
asset_selection = [AssetSelection.keys("xxxx_4_untyped_vw")]
run_id = pipeline_utils_client.client._execute(
query = """
mutation($executionParams: ExecutionParams!) {
launchPipelineExecution(executionParams: $executionParams) {
__typename
... on InvalidStepError {
invalidStepKey
}
... on InvalidOutputError {
stepKey
invalidOutputName
}
... on LaunchPipelineRunSuccess {
run {
runId
}
}
... on ConflictingExecutionParamsError {
message
}
... on PresetNotFoundError {
message
}
... on PipelineRunConflict {
message
}
... on PipelineConfigValidationInvalid {
errors {
__typename
message
path
reason
}
}
... on PipelineNotFoundError {
message
}
... on PythonError {
message
}
}
}
""",
variables = {
"executionParams": {
"selector": {
"repositoryLocationName": "test_loc.py",
"repositoryName": "__repository__",
"assetSelection": asset_selection
}
}
}
)
It's giving some error.. what am I doing wrong?owen
09/13/2023, 12:13 AMowen
09/13/2023, 12:14 AM"assetSelection": [<http://key.to|key.to>_graphql_input() for key in asset_selection]
Abhishek Agrawal
09/13/2023, 12:18 AM'KeysAssetSelection' object has no attribute 'to_graphql_input'
Abhishek Agrawal
09/13/2023, 12:18 AMasset_selection = [AssetSelection.keys("xxxx_4_untyped_vw")]
this returns KeyAssetSelectionAbhishek Agrawal
09/13/2023, 12:32 AMowen
09/13/2023, 4:09 PMasset_selection
in that to_graphql_input code to be of type List[AssetKey]
. If you're really just selecting assets by key (and not doing any of the fancier AssetSelection.keys(...).upstream()
stuff), then I'd recommend just doing asset_selection = [AssetKey("asset1"), AssetKey("asset2")]
. but if you're using the fancier stuff, then you can resolve your AssetSelection object into a list of keys by doing AssetSelection.keys(...).resolve(defs.get_repository_def().asset_graph)
(where defs is your Definitions object).