Is it possible to use op_selection argument to sel...
# ask-community
a
Is it possible to use op_selection argument to select a particular asset to materialise in an asset job using Python GraphQL Client? https://docs.dagster.io/_apidocs/libraries/dagster-graphql#dagster_graphql.DagsterGraphQLClient.submit_job_execution
y
a
Hey @yuhan, I tried to pass the Asset Selection to the op_selection field. I selected a subset of my asset job. However, it gave me an error. I also tried to pass just 1 asset key from my asset job but that didn't work either. Am I doing something wrong?
On second thoughts, did you mean I create an AssetJob and pass that job's name in the
submit_job_execution
function? Basically create AssetJobs on the fly..?
👍 1
y
right, i meant that you’d need to create an asset job and apply asset_selection on that asset job
a
Thanks for your reply. I think I am getting it wrong. I already have asset jobs defined in my code definitions. Now, in my CloudRun service, I am using Python GraphQL Client to trigger certain jobs. It is working like a charm when I have to run the complete job but in this specific scenario, I have an AssetJob and I only want to materialise a subset of assets. I can't create a new AssetJob here as it won't be found in the code definitions. If I use the existing AssetJob name and pass one of the asset keys from the job via the
op_selection
argument, I'm getting an error. I should be able to select assets using the
op_selection
field, right?
These are the execution params as I can see from the error message that gets logged in my service.
Copy code
{'executionParams': {'selector': {'repositoryLocationName': 'pipeline', 'repositoryName': '__repository__', 'pipelineName': 'datamart_load', 'solidSelection': <dagster._core.definitions.asset_selection.KeysAssetSelection object at 0x3e2721938f10>}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}
Copy code
DEFAULT 2023-06-26T16:58:10.197598Z "text": "*Error Details/Message:*\nFailed to trigger the job. Error message: Exception occured during execution of query
DEFAULT 2023-06-26T16:58:10.197602Z mutation($executionParams: ExecutionParams!) {
DEFAULT 2023-06-26T16:58:10.197606Z launchPipelineExecution(executionParams: $executionParams) {
DEFAULT 2023-06-26T16:58:10.197609Z __typename
DEFAULT 2023-06-26T16:58:10.197614Z ... on InvalidStepError {
DEFAULT 2023-06-26T16:58:10.197617Z invalidStepKey
DEFAULT 2023-06-26T16:58:10.197621Z }
DEFAULT 2023-06-26T16:58:10.197625Z ... on InvalidOutputError {
DEFAULT 2023-06-26T16:58:10.197628Z stepKey
DEFAULT 2023-06-26T16:58:10.197631Z invalidOutputName
DEFAULT 2023-06-26T16:58:10.197635Z }
DEFAULT 2023-06-26T16:58:10.197639Z ... on LaunchPipelineRunSuccess {
DEFAULT 2023-06-26T16:58:10.197642Z run {
DEFAULT 2023-06-26T16:58:10.197645Z runId
DEFAULT 2023-06-26T16:58:10.197648Z }
DEFAULT 2023-06-26T16:58:10.197651Z }
DEFAULT 2023-06-26T16:58:10.197654Z ... on ConflictingExecutionParamsError {
DEFAULT 2023-06-26T16:58:10.197675Z message
DEFAULT 2023-06-26T16:58:10.197678Z }
DEFAULT 2023-06-26T16:58:10.197682Z ... on PresetNotFoundError {
DEFAULT 2023-06-26T16:58:10.197685Z message
DEFAULT 2023-06-26T16:58:10.197687Z }
DEFAULT 2023-06-26T16:58:10.197691Z ... on PipelineRunConflict {
DEFAULT 2023-06-26T16:58:10.197695Z message
DEFAULT 2023-06-26T16:58:10.197697Z }
DEFAULT 2023-06-26T16:58:10.197701Z ... on PipelineConfigValidationInvalid {
DEFAULT 2023-06-26T16:58:10.197704Z errors {
DEFAULT 2023-06-26T16:58:10.197707Z __typename
DEFAULT 2023-06-26T16:58:10.197710Z message
DEFAULT 2023-06-26T16:58:10.197713Z path
DEFAULT 2023-06-26T16:58:10.197716Z reason
DEFAULT 2023-06-26T16:58:10.197718Z }
DEFAULT 2023-06-26T16:58:10.197721Z }
DEFAULT 2023-06-26T16:58:10.197724Z ... on PipelineNotFoundError {
DEFAULT 2023-06-26T16:58:10.197727Z message
DEFAULT 2023-06-26T16:58:10.197730Z }
DEFAULT 2023-06-26T16:58:10.197733Z ... on PythonError {
DEFAULT 2023-06-26T16:58:10.197736Z message
DEFAULT 2023-06-26T16:58:10.197740Z }
DEFAULT 2023-06-26T16:58:10.197743Z }
DEFAULT 2023-06-26T16:58:10.197746Z }
DEFAULT 2023-06-26T16:58:10.197749Z with variables
DEFAULT 2023-06-26T16:58:10.197756Z {'executionParams': {'selector': {'repositoryLocationName': 'data-os@data-pipeline', 'repositoryName': '__repository__', 'pipelineName': 'abi_dev_3_datamart_load', 'solidSelection': <dagster._core.definitions.asset_selection.KeysAssetSelection object at 0x3e2721938f10>}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}
DEFAULT 2023-06-26T16:58:10.197758Z "
above is the full error message
Maybe my syntax is wrong. Could you share an example of selecting assets from an asset job using
submit_job_execution
method?
y
got it thanks for the info. let me see if i can grab someone on the team to help out.
a
Some more information that might help in diagnosis here I tried passing the the asset name as string in
op_selection
argument and got a different error -
Copy code
{
  "textPayload": "INFO:gql.transport.requests:>>> {\"query\": \"mutation ($executionParams: ExecutionParams!) {\\n  launchPipelineExecution(executionParams: $executionParams) {\\n    __typename\\n    ... on InvalidStepError {\\n      invalidStepKey\\n    }\\n    ... on InvalidOutputError {\\n      stepKey\\n      invalidOutputName\\n    }\\n    ... on LaunchPipelineRunSuccess {\\n      run {\\n        runId\\n      }\\n    }\\n    ... on ConflictingExecutionParamsError {\\n      message\\n    }\\n    ... on PresetNotFoundError {\\n      message\\n    }\\n    ... on PipelineRunConflict {\\n      message\\n    }\\n    ... on PipelineConfigValidationInvalid {\\n      errors {\\n        __typename\\n        message\\n        path\\n        reason\\n      }\\n    }\\n    ... on PipelineNotFoundError {\\n      message\\n    }\\n    ... on PythonError {\\n      message\\n    }\\n  }\\n}\", \"variables\": {\"executionParams\": {\"selector\": {\"repositoryLocationName\": \"data-os@data-pipeline\", \"repositoryName\": \"__repository__\", \"pipelineName\": \"abi_dev_3_datamart_load\", \"solidSelection\": \"abi_dev_3_dm_stg_paid_media_mappings\"}, \"runConfigData\": {}, \"mode\": \"default\", \"executionMetadata\": {\"tags\": [{\"key\": \"user\", \"value\": \"Middleware\"}]}}}}"
In the snapshot, you can see the stack trace. I am passing the job name as string and it is causing a KeyError in Dagster libraries..
y
ah
submit_job_execution
takes
op_selection
as
Optional[Sequence[str]]
a
Yeah I did try to pass the
[AssetKey("<key>")]
or
["<key>"]
. As in, list of AssetKey or just a string list but it didn't like it.
Error when I passed list of AssetKey
Copy code
"text": "*Error Details/Message:*\nFailed to trigger the job. Error message: Exception occured during execution of query \n\nmutation($executionParams: ExecutionParams!) {\n  launchPipelineExecution(executionParams: $executionParams) {\n    __typename\n\n    ... on InvalidStepError {\n      invalidStepKey\n    }\n    ... on InvalidOutputError {\n      stepKey\n      invalidOutputName\n    }\n    ... on LaunchPipelineRunSuccess {\n      run {\n        runId\n      }\n    }\n    ... on ConflictingExecutionParamsError {\n      message\n    }\n    ... on PresetNotFoundError {\n      message\n    }\n    ... on PipelineRunConflict {\n      message\n    }\n    ... on PipelineConfigValidationInvalid {\n      errors {\n        __typename\n        message\n        path\n        reason\n      }\n    }\n    ... on PipelineNotFoundError {\n      message\n    }\n    ... on PythonError {\n      message\n    }\n  }\n}\n\n with variables \n{'executionParams': {'selector': {'repositoryLocationName': 'data-os@data-pipeline', 'repositoryName': '__repository__', 'pipelineName': 'abi_dev_3_datamart_load', 'solidSelection': [AssetKey(['abi_dev_3_dm_stg_paid_media_mappings'])]}, 'runConfigData': {}, 'mode': 'default', 'executionMetadata': {'tags': [{'key': 'user', 'value': 'Middleware'}]}}}\n""
Here, I am just passing a list of string for
op_selection
.
o
hi @Abhishek Agrawal! sorry for the confusion, but the submit_job_execution method does not currently support passing asset selections, although it'd likely be an easy change if you're up for it (basically just passing in an assetSelection argument through to here: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster-graphql/dagster_graphql/client/client.py?L169). Otherwise, you can execute the graphql query manually, with something like: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster-graphql/dagster_graphql/client/client_queries.py, then pass in
"assetSelection": [<http://key.to|key.to>_graphql_input() for key in asset_selection]
into the "selector" of your executionParams
a
@owen - resuscitating this.. I finally got around doing this because a requirement came up.. This is what I have -
pipeline_utils_client
is just an object of
DagsterGraphQLClient
.
Copy code
pipeline_utils_client = pipeline_utils.PipelineUtilsClass()
        asset_selection = [AssetSelection.keys("xxxx_4_untyped_vw")]
        run_id = pipeline_utils_client.client._execute(
            query = """
            mutation($executionParams: ExecutionParams!) {
            launchPipelineExecution(executionParams: $executionParams) {
                __typename

                ... on InvalidStepError {
                invalidStepKey
                }
                ... on InvalidOutputError {
                stepKey
                invalidOutputName
                }
                ... on LaunchPipelineRunSuccess {
                run {
                    runId
                }
                }
                ... on ConflictingExecutionParamsError {
                message
                }
                ... on PresetNotFoundError {
                message
                }
                ... on PipelineRunConflict {
                message
                }
                ... on PipelineConfigValidationInvalid {
                errors {
                    __typename
                    message
                    path
                    reason
                }
                }
                ... on PipelineNotFoundError {
                message
                }
                ... on PythonError {
                message
                }
            }
            }
            """,
            variables = {
            "executionParams": {
                "selector": {
                    "repositoryLocationName": "test_loc.py",
                    "repositoryName": "__repository__",
                    "assetSelection": asset_selection
                }
            }
        }
            
        )
It's giving some error.. what am I doing wrong?
o
what's the error?
but at a glance it looks like you should be passing in
"assetSelection": [<http://key.to|key.to>_graphql_input() for key in asset_selection]
a
The error is pretty long actually.. way past the limit of my terminal.. I did try what you suggested above..
Copy code
'KeysAssetSelection' object has no attribute 'to_graphql_input'
I think my asset_selection is incorrect..
Copy code
asset_selection = [AssetSelection.keys("xxxx_4_untyped_vw")]
this returns KeyAssetSelection
I guess am not completely understanding how to really make a selection here.. could you help with an example? Also, if I do a selection of assets, do I still need the pipeline name/job name?
o
ah I see -- you want
asset_selection
in that to_graphql_input code to be of type
List[AssetKey]
. If you're really just selecting assets by key (and not doing any of the fancier
AssetSelection.keys(...).upstream()
stuff), then I'd recommend just doing
asset_selection = [AssetKey("asset1"), AssetKey("asset2")]
. but if you're using the fancier stuff, then you can resolve your AssetSelection object into a list of keys by doing
AssetSelection.keys(...).resolve(defs.get_repository_def().asset_graph)
(where defs is your Definitions object).