Edited: Issues with config. Getting `Missing requi...
# ask-community
d
Edited: Issues with config. Getting
Missing required config entry "config" at the root
issues
a
Adding the code in the thread is preferred for the reason you mentioned. Also using the
code block
formatting option is really good when it comes to sharing code.
d
Thx @Abhishek Agrawal... just changed the Slack preferences to get the code formatting to be less wonky. Hope it works better now. For the following code:
Copy code
python
from dagster import (
    asset,
    job,
    op,
    build_op_context,
    DynamicOut,
    DynamicOutput,
    StringSource,
)

@asset(config_schema={"user_scope": StringSource})
def doc_ids_from_gdrive(context) -> list[str]:
    return ["doc-1", "doc-2", "doc-3"]

@op(config_schema={"user_scope": StringSource}, out=DynamicOut())
def get_doc_ids_from_gdrive():
    context = build_op_context()
    ids = doc_ids_from_gdrive(context)
    user_scope = context.op_config["user_scope"]
    for idx, doc_id in ids:
        print(f"Yield {doc_id}")
        yield DynamicOutput(doc_id, mapping_key=f"gdrive-doc.{user_scope}.{doc_id}")

@job
def get_text_from_gdrive_doc_ids() -> list[str]:
    doc_ids = get_doc_ids_from_gdrive()
    print(f'Dealings with docs:[{doc_ids}]')
    # TODO Something real
    return []
I get the following error:
Copy code
dagster._core.errors.DagsterInvalidConfigError: Error in config for op
    Error 1: Missing required config entry "config" at the root. Sample config for missing entry: {'config': {'user_scope': '...'}}
when using the following config:
Copy code
yaml
ops:
  get_doc_ids_from_gdrive:
    config:
      user_scope: test-user
  text_from_gdrive_doc:
    config:
      user_scope: test-user
At attempts to append the following to the config (which I can already guess wouldn't comply to the schema)
Copy code
yaml
config:
  user_scope: test-user
The error reported changes to
Copy code
Received unexpected config entry "config" at the root. Expected...
Which confuses me since Dagster seems to lack a config entry at root, but when we do supply it, things Dagster doesn't quite seem to be happy. What am I missing?
e
Can you try converting that asset into an op?
d
Upon trying I get
+dagster._core.errors.DagsterInvalidDefinitionError: @job 'get_text_from_gdrive_doc_ids' returned problematic value of type <class 'list'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
, so now I'll reset and try again from the doc examples as I'm guessing my use of
DynamicOut
is the problem.
c
I think this might be a result of the use of
build_op_context
here - essentially you need to pass the config directly to
build_op_context
On another note,I’d discourage usage of the invoke-asset-within-op pattern used here, as dagster won’t be able to accurately track the execution of the asset - what’s the intended design?
d
Thanks for this @chris, I've reduced the entire setup to remove any use of configs for now and am rethinking our approach. Use case: 1. Get all document ids for a
user
(maybe materialize an asset here) where
user
is passed along as a job param (probably through GraphQL API call) 2. retrieve content for every document id (maybe materialize an asset here) 3. perform some computation/processing on every bit of content 4. finally, perform some computation on the aggregate (maybe produce another asset). I've been informed that I may want to trigger this job through the GraphQL API and provide the user param upon invocation to get things started in a more push-style, but I must be misunderstanding how I use ConfigSchema correctly (along with assets and jobs). My initial mistake may have been trying to wrap everything into assets too soon. Maybe, I should just get the ops since they are just basic funcs (without any fluff) and then think about where asset materializations really make sense and whether the fit within the use-case for which the asset decorator is made available. Got the pipeline to work by reducing the lot to:
Copy code
python
from dagster import (
    job,
    op,
    DynamicOut,
    DynamicOutput,
)
import re


def sanitize(input_string):
    return re.sub(r"[^A-Za-z0-9_]", "", input_string)


@op()
def doc_ids_from_gdrive():
    # TODO: Get document ids from gdrive
    # context.resources.gdrive_client
    # TODO: Populate asset by persisting retrieved ids
    return ["doc-1", "doc-2", "doc-3"]


@op(out=DynamicOut())
def get_doc_ids_from_gdrive():
    ids = doc_ids_from_gdrive()
    user_scope = "scope-00"
    for doc_id in ids:
        sanitized_doc_id = sanitize(doc_id)
        sanitized_user_scope = sanitize(user_scope)
        mapping_key = f"gdrive_{sanitized_user_scope}_{sanitized_doc_id}"
        print(f"Yield doc:{doc_id}")
        yield DynamicOutput(doc_id, mapping_key=mapping_key)


@op
def get_text_for_gdrive_doc_id(doc_id):
    print(f"Got text for doc:{doc_id}")
    return f"Document {doc_id}"


@job
def get_text_from_gdrive_doc_ids():
    # TODO: Retrieve doc given scope and doc_id
    # TODO: Persist into database
    doc_ids = get_doc_ids_from_gdrive()
    doc_ids.map(get_text_for_gdrive_doc_id)
but I still have to figure out A) how to pass some params into the Job and B) whether you architecturally think that this is "misusing Dagster". For example, should I define
doc_ids_from_gdrive
(and later,
encrypted_text_from_gdrive
) as assets instead? P.S.: During dev with dagit, I find it rather cumbersome to reload the defs and then to Deployments > Jobs > Launchpad (potentially update confiig) > Launch Run. Is there a faster DX to fire up a job? Would https://docs.dagster.io/concepts/configuration/config-schema#command-line be what you'd go for?
c
Regarding the dagit feedback - yea that’s definitely something that’s on the roadmap to address is fast / auto-reloading of dagit to make that flow less cumbersome. In the meantime, you could potentially use the python API to run your job via
job_def.execute_in_process
, via a script
Regarding the architecture, I think it mostly makes sense with a few things that could be improved: • Calling
docs_from_id_drive
within
get_docs_from_id_drive
is considered bad practice - ops invoked within other ops don’t get treated as part of the dag, they just get invoked as regular python functions basically (we should do a better job around pointing people in the right direction here, I had a PR to error when folks do this but never landed it 😅 ) ◦ As a result, I’d recommend just turning
docs_from_id_drive
into a regular python function, I’m not sure it’s necessary to have two separate ops for what’s happening there but I could be missing something • It sounds like you want this job to run on a per-user basis, and basically be triggered ad-hoc via graphql for a particular user; given that I think it makes sense to use config to represent the user
❤️ 1
Here’s what I’m imagining things looking like based on the changes I just described:
Copy code
def doc_ids_from_gdrive(user): # Notice how this isn't an op
    ...


@op(out=DynamicOut(), config_schema={"user": str})
def get_doc_ids_from_gdrive(context: OpExecutionContext):
    ids = doc_ids_from_gdrive(context.op_config["user"])
    user_scope = "scope-00"
    for doc_id in ids:
        sanitized_doc_id = sanitize(doc_id)
        sanitized_user_scope = sanitize(user_scope)
        mapping_key = f"gdrive_{sanitized_user_scope}_{sanitized_doc_id}"
        print(f"Yield doc:{doc_id}")
        yield DynamicOutput(doc_id, mapping_key=mapping_key)


@op
def get_text_for_gdrive_doc_id(doc_id):
    print(f"Got text for doc:{doc_id}")
    return f"Document {doc_id}"


@job
def get_text_from_gdrive_doc_ids():
    # TODO: Retrieve doc given scope and doc_id
    # TODO: Persist into database
    doc_ids = get_doc_ids_from_gdrive()
    doc_ids.map(get_text_for_gdrive_doc_id)
Then your config would look almost identical to what you originally provided
Copy code
ops:
  get_doc_ids_from_gdrive:
    config:
      user: test-user