https://dagster.io/ logo
j

Jeff Tilton

11/12/2020, 4:45 AM
Hi folks, just starting to play around with dagster and it seems awesome, but I have a question. I saw that there had been an issue for a Fully fleshed out lambda execution engine that is now closed. I currently work with a system that executes a lot of python scripts in corn to grab data and put it in our db, but we want to move the majority of these scripts to lambda functions. My questions are, is there a "correct" way to invoke a lambda function and maintain it as part of the pipeline? I would be getting the response and then inserting into our db. I was thinking at the very least if it would be possible to get the uuid I see when I execute at the command line then I could create another pipeline in my lambda function and pass the uuid so if I put all my logs in a database or elastic search I would be able to put my pipeline back together to see it that way.
Copy code
2020-11-11 20:29:01 - dagster - DEBUG - inputs_pipeline - 7480db29-cf8b-4e18-b69c-6695a58b6867 <- uuid
Any advice on how to apply serverless to dagster?
s

sashank

11/12/2020, 5:10 AM
Hey @Jeff Tilton, there are two ways to think about using lambda with Dagster: 1. Writing a lambda executor 2. Calling lambda functions from solids The first one is more involved, but the second is a lot easier. To call a lambda function from a solid, you could can just use the boto3 client.
It would look something like this:
Copy code
@solid
def my_solid(_):
    client = boto3.client("lambda")
    payload = {...}
    response = client.invoke(
        FunctionName="my-function-name",
        InvocationType="RequestResponse",
        Payload=json.dumps(payload),
    )
    return json.loads(response["Payload"].read()
You can take the response, process it however you like (transform it, upload it to a db, etc), and pass the result to downstream solids where you can call other lambda functions
j

Jeff Tilton

11/12/2020, 5:13 AM
Hi @sashank your second option is what I want to do, but then I kind of lose some of the pipeline. So what I was wondering if there was a way to get the uuid that seems to be created for a pipeline and then pass it to another pipeline (up to the lambda function). Then I could put together all of the logs to see the whole transaction.
s

sashank

11/12/2020, 5:14 AM
By uuid do you mean the dagster pipeline run uuid? You can access that through the context:
Copy code
@solid
def my_solid(context)
   run_id = context.pipeline_run.run_id
j

Jeff Tilton

11/12/2020, 5:14 AM
Perfect
Is there a way to set the run_id manually?
I would get it from the first pipeline, pass it to the lambda function and set it there as well.
s

sashank

11/12/2020, 5:16 AM
But I’m not sure I’m fully following your plan/use case–I have a feeling that you don’t need to pass up the run id and using the dagster event log can satisfy your requirements
Set the run id during pipeline execution? No it’s fixed. You can manually set the run id when creating a run, but I’m not sure why you would want to do that.
Is your plan to call a lambda function from a dagster pipeline, then call another dagster pipeline from the lambda function?
j

Jeff Tilton

11/12/2020, 5:18 AM
Yes that is my plan
So I want those 2 pipelines to be linked somehow
s

sashank

11/12/2020, 5:18 AM
Why not do this all in one pipeline? And have two solids in one pipeline
You can pass the result from the first lambda function to the second solid
For example:
Copy code
@solid
def my_solid(_):
    client = boto3.client("lambda")
    payload = {...}
    response = client.invoke(
        FunctionName="my-function-name",
        InvocationType="RequestResponse",
        Payload=json.dumps(payload),
    )
    return json.loads(response["Payload"].read()

@solid
def my_second_solid(_, my_input):
    # Use my input in this solid

    client = boto3.client("lambda")
    payload = {...}
    response = client.invoke(
        FunctionName="my-second-function-name",
        InvocationType="RequestResponse",
        Payload=json.dumps(payload),
    )

@pipeline
def my_pipeline():
    my_second_solid(my_solid())
Then you would be able to visualize the entire execution within one pipeline run and see all the logs in one place. You can have an arbitrarily complex DAG of lambda functions, and it’s much easier to visualize + do error handling if you do it all in one pipeline.
j

Jeff Tilton

11/12/2020, 5:23 AM
Except the logs created in the lambda function correct? Those would be on cloudwatch. I would be blind of what happens in the lambda function.
@sashank thanks for the examples. Having the runid was what I really wanted.
s

sashank

11/12/2020, 5:59 AM
Awesome, let us know if you run into any other issues
Just so you know all the possibilities here, you could pipe the cloudwatch logs into the Dagster event logs
For example, if you know your logs are going to be <4kb, they’re returned in the lambda response. You could write a lambda resource like so that takes care of logging all the results:
Copy code
@resource
def lambda_resource(context):
    client = boto3.client("lambda")

    def invoke(function_name, payload):
        response = client.invoke(
            FunctionName=function_name,
            LogType="Tail",
            InvocationType="RequestResponse",
            Payload=json.dumps(payload),
        )

        <http://context.log.info|context.log.info>(base64.b64decode(response["LogResult"]))
        return json.loads(response["Payload"].read())

    return invoke

@solid(required_resource_keys={"lambda_resource"})
def my_solid(context):
    response = context.resources.lambda_resource("my-function", payload={...})
    return ...
Or if the logs are much larger, you could yield an
AssetMaterialization
that stores a reference to the cloudwatch logs so you can go inspect them as necessary:
Copy code
@resource
def lambda_resource(context):
    client = boto3.client("lambda")

    def invoke(function_name, payload):
        response = client.invoke(
            FunctionName=function_name,
            InvocationType="RequestResponse",
            Payload=json.dumps(payload),
        )

        # Generate url to cloudwatch logs
        url = generate_cloudwatch_log_url(response)
        yield AssetMaterialization(
            asset_key="lambda_logs:{}".format(function_name),
            metadata_entries=[EventMetadataEntry.url(label="CloudWatch Logs", url=url],
        )

        return json.loads(response["Payload"].read())

    return invoke
j

Jeff Tilton

11/12/2020, 4:34 PM
This is great @sashank, I really appreciate you taking the time to show this. We are just starting to explore our transition so I did not know I received the logs as well. This is extremely helpful.
🎉 1
s

sashank

11/24/2020, 7:37 PM
Hey @Jeff Tilton, just wanted to follow up here. Were you able to get everything working?