Hi, new to dagster and trying to learn more about ...
# ask-community
n
Hi, new to dagster and trying to learn more about software defined data assets. One thing I am not sure I quite understand is how passing data as input to an asset works. It appears as though the data is passed to the asset in-memory after being de-pickled, but what if that data is too large to fit in memory? Suppose I have a dataset of 1M images (an array of s3 URLs) that I want to apply some transformation to. Would I pass this array of s3 URLs into my asset, and load each image from s3 in the asset itself? It feels like this goes against the notion of separating io from business logic. Would the right move to be to partition the 1M images? Furthermore, suppose you want to implement an asset which trains a model, and the images live in s3, how would you implement that? Would be cool to see an example of a model training with a lot of data, and how that's implemented in this new data asset paradigm One other question, can you have a graph of assets, where one asset might run inside of a docker container from image A, and another runs inside of a docker container from image B? Is that currently possible?
s
The asset "output" can be anything you want, including just metadata about the asset itself. For example, we use assets to orchestrate Sagemaker training / model / endpoint / processing "assets", but we don't actually have Dagster passing along the actual images, model artifatcts, etc. Instead it basically passes around pointers to those objects, which the next asset can use. for example:
Copy code
@asset
def training_job():
    todays_job = datetime.datetime.now().strfmt("%Y-%m-%d")
    client = boto3.client("sagemaker")
    client. create_and_poll_training_job( TrainingJobName=todays_job, **other_params)
    details = client.describe_processing_job(todays_job)
    return details

@asset
def model_version(training_job: dict):
    artifact = training_job["artifact_s3_uri"]
    client.create_model(name="my_model", artifact = artifact)
    details = client.describe_model("my_moel")
    return details
so in this case, im not really using custom io_managers at all, more just communicating and versioning the jobs being launched. i called this a "weak assets" approach here: https://dagster.slack.com/archives/C01U954MEER/p1663585111253899?thread_ts=1663445082.044809&cid=C01U954MEER
wrt asset graphs, one approach you can take is to use
k8s_job_op
to have it launched as its own docker image (or some similar approach where the dagster ops is actually launching a secondary job somewhere to do the work -- this is how sagemaker works for example). you can also link together assets from two different code locations using
SourceAsset
, although in the case you mention above it's probably not best practice. wrt to multi-image asset graphs, one approach you could use is doing something like
k8s_job_op
n
ah interesting. It's possible to use dagster for running the actual training code right, if we don't want to use Sagemaker? I think the notion of passing around metadata with pointers to the objects makes sense. Not super clear on the k8s_job_op just yet, but that sounds like kubernetes right? if you are running a small single machine deployment let's say, but want to run different ops/assets on different docker images, is this currently possible without kubernetes?
it looks like you can specify a docker image on a job: https://docs.dagster.io/_apidocs/libraries/dagster-docker#dagster_docker.DockerRunLauncher does this also work for assets?
can you specify the same such docker executor on an asset?
another thing i'm not quite clear on is whether a job can be called from within an asset?
s
an asset is a special class of an
op
, so you can't call other jobs unless you hit the graphql api from within the job afaik
my mental model is that the constraint you operate under is that your Code Location can only define ops within a single Docker image. so you can't do something like:
Copy code
# all defined in code location 1

@asset(executor_def=image_1)
def my_first_model():
    model.fit()
    return model

@asset(executor_def=image_2)
def my_second_model():
    model.fit()
    return model
because both of those would be launched from the Code Location 1 image daemon (which could be docker or kubernetes). but you could do something like:
Copy code
# all defined in code location 1

@asset
def my_first_model():
    docker_run_launcher.run_image(image_1)
    metadata = fetch_metadata()
    return metadata

@asset
def my_second_model():
    docker_run_launcher.run_image(image_2)
    metadata = fetch_metadata()
    return metadata
but, im going to confess im hitting my limits of understanding here
n
what is docker_run_launcher? there doesn't appear to be much docker-based documentation/ examples/tutorials on the docs site
oh i think i get it , you are suggesting to call
docker run
via the API from within an asset
that's an interesting idea, but I was hoping there's a way to specify the docker image from which your actual asset runs inside, imagine something like:
@asset(docker_image = 'hello-world')
context: I have pipelines that have pretty different and complicated environments. For example, I might have an op / asset that needs to run some c++ inference code, and then another one that runs some python code, and so I can't be running these ops all within the same docker container
c
Hi Nadav. Apologies about the late response, but I think you can run each asset in a different docker image if you are using the
k8s_job_executor
or
celery_k8s_job_executor
. https://dagster.slack.com/archives/C01U954MEER/p1644335565408589?thread_ts=1644334246.680049&cid=C01U954MEER
Not entirely sure if this works, but I think on your
@asset
you would specify
op_tags
instead:
Copy code
@asset(op_tags={...})
with the code snippet in Johann's response