Cris
08/18/2020, 6:22 PMdwall
08/18/2020, 6:25 PMBytesStream
to slack as a file:
@solid(
description="A solid to upload a buffered stream to Slack.",
input_defs=[
InputDefinition(
name="stream", dagster_type=BytesStream, description="A buffered stream of a matplotlib figure."
)
],
output_defs=[OutputDefinition(name="slack_file", dagster_type=SlackFile)],
config_schema={
"channels": Field(
config=Array(String),
description="A list of channel names or IDs where the file will be shared.",
is_required=True,
),
"filename": Field(
config=Noneable(String), description="Filename of the shared file.", is_required=False, default_value=None
),
"filetype": Field(
config=Noneable(String), description="Filetype of the shared file.", is_required=False, default_value=None
),
"initial_comment": Field(
config=Noneable(String),
description="The message text introducing the file.",
is_required=False,
default_value=None,
),
"title": Field(
config=Noneable(String), description="Title of the shared file.", is_required=False, default_value=None
),
},
required_resource_keys={"slack"},
tags={"kind": "slack"},
)
def send_stream_to_slack(context, stream: BytesStream):
params = {k: v for k, v in context.solid_config.items() if v is not None}
params.update({"channels": ",".join(context.solid_config["channels"])})
resp = context.resources.slack.files_upload(file=stream, **params)
if not resp["ok"]:
raise Failure(description="Unsuccessful file upload to Slack.")
slack_file = SlackFile.from_resp_object(resp["file"])
yield AssetMaterialization(
metadata_entries=[EventMetadataEntry.url(url=slack_file.permalink, label="Slack File")],
asset_key=slack_file.permalink,
)
yield Output(value=slack_file, output_name="slack_file")
Cris
08/18/2020, 6:29 PMdwall
08/18/2020, 6:30 PM