Ayrton Bourn
12/12/2020, 11:15 AMcomposite_solid
.
@composite_solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH'),
'col_dim': Field(str, is_optional=True, default_value='fueltype'),
}
)
def update_dataset(context):
df_stream_latest = get_latest_df(source, context.solid_config['stream'])
da_stream_latest = convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
save_da_to_zarr(da_stream_latest, source, context.solid_config['stream'])
return
@pipeline
def update_datasets_pipeline():
update_dataset()
return
However composite_solid
can't accept a config_schema, which means I have to repeatedly specify the same variables both in the individual solids, but also the pipeline inputs yaml. E.g.
@solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH')
}
)
def get_latest_df(context) -> Any:
df_stream_latest = update.get_stream_latest_data(context.solid_config['source'], context.solid_config['stream'])
return df_stream_latest
@solid(
config_schema={
'col_dim': Field(str, is_optional=True, default_value='fueltype')
}
)
def convert_df_to_da(context, df_stream_latest: Any) -> Any:
da_stream_latest = update.convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
return da_stream_latest
@solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH')
}
)
def save_da_to_zarr(context, da_stream_latest: Any):
update.save_stream_data_to_zarr(da_stream_latest, context.solid_config['source'], context.solid_config['stream'])
return
Is there a way to specify these variables only once?Noah K
12/12/2020, 11:18 AMNoah K
12/12/2020, 11:19 AMNoah K
12/12/2020, 11:19 AM@resource(config_schema={...})
def asset(ctx):
return ctx.resource_config
Noah K
12/12/2020, 11:19 AMJeremy H
12/14/2020, 3:42 AMEvents:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 69s default-scheduler Successfully assigned dagster/dagster-run-db90fb2b-df8a-4bb2-af55-4cedecb91cb7-nc55c to lke10936-18246-5fd2d5d8e086
Normal BackOff 36s (x2 over 67s) kubelet Back-off pulling image "<http://docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest|docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest>"
Warning Failed 36s (x2 over 67s) kubelet Error: ImagePullBackOff
Normal Pulling 24s (x3 over 68s) kubelet Pulling image "<http://docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest|docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest>"
Warning Failed 24s (x3 over 68s) kubelet Failed to pull image "<http://docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest|docker.pkg.github.com/jeremyhermann/docker-repo/dagster:latest>": rpc error: code = Unknown desc = Error response from daemon: Get <https://docker.pkg.github.com/v2/jeremyhermann/docker-repo/dagster/manifests/latest>: no basic auth credentials
Warning Failed 24s (x3 over 68s) kubelet Error: ErrImagePull
My Helm values.yaml file:
dagit:
nodeSelector:
<http://lke.linode.com/pool-id|lke.linode.com/pool-id>: "18034"
postgresql:
master:
nodeSelector:
<http://lke.linode.com/pool-id|lke.linode.com/pool-id>: "18034"
k8sRunLauncher:
enabled: true
env_secrets:
- "aws-secrets-env"
nodeSelector:
<http://lke.linode.com/pool-id|lke.linode.com/pool-id>: "18386"
userDeployments:
enabled: true
deployments:
- name: "k8s-user-code"
image:
repository: "<http://docker.pkg.github.com/jeremyhermann/docker-repo/dagster|docker.pkg.github.com/jeremyhermann/docker-repo/dagster>"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "-f"
- "training_repo.py"
replicaCount: 1
port: 3030
env:
ENV_VAR: ""
env_config_maps:
- ""
env_secrets:
- "aws-secrets-env"
nodeSelector:
<http://lke.linode.com/pool-id|lke.linode.com/pool-id>: "18386"
affinity: {}
tolerations: []
podSecurityContext: {}
securityContext: {}
resources: {}
imagePullSecrets:
- name: dockerconfigjson-github-com
celery:
enabled: false
rabbitmq:
enabled: false
Richard Fisher
12/14/2020, 8:07 AM$DAGSTER_HOME/schedules/logs/<schedule_id>/
I was hoping to be able to do something like this in the dagster.yaml file:
scheduler:
module: dagster_cron.cron_scheduler
class: SystemCronScheduler
config:
base_dir: /var/lib/persistent_storage
I’ve set up the event_log_storage, which contains most of the same info, but it would be good to have the schedule logs as well.Istvan Darvas
12/14/2020, 3:23 PMsean
12/14/2020, 10:47 PMdagstermill.yield_result(my_dataarray, 'name')
at the end of my notebook solid, I get an error-- it appears dagstermill is attempting to serialize the arrays using scrapbook
, and this fails.
How do I bypass scrapbook and use my own serialization logic? What I want to do is just keep an in-memory xarray dataset that stores my arrays. I attempted to do this by creating a custom asset_store
, but this makes no difference-- dagstermill is still trying to use scrapbook when I call dagstermill.yield_result
. Here is the code I used for the custom asset store, am I perhaps missing something important?
python
import dagster as dg
class DatasetAssetStore(dg.AssetStore):
def __init__(self):
super(DatasetAssetStore, self)
self.dataset = xr.Dataset()
def get_asset(self, context):
name = context.output_name
return self.dataset[name]
def set_asset(self, context, obj):
name = context.output_name
self.dataset[name] = obj
@dg.resource
def dataset_asset_store(_):
return DatasetAssetStore()
dataset_mode = dg.ModeDefinition(
resource_defs={"asset_store": dataset_asset_store}
)
my_pipeline = dg.PipelineDefinition(
name='test',
solid_defs=[...],
dependencies={...},
mode_defs=[dataset_mode]
)
Noah K
12/16/2020, 3:02 AMNoah K
12/16/2020, 3:03 AMNicolas Gaillard
12/16/2020, 10:46 AMesztermarton
12/16/2020, 2:51 PMAnatoliy Zhyzhkevych
12/17/2020, 1:53 AMXu Zhang
12/17/2020, 6:32 AMSucceeded
signal, but for other events, there isn’t any, and also the waterfall wouldn’t stop either. was this due to the HTTP? or do you know what kind of info should i provide to you in order to debug this?Adonis
12/17/2020, 3:24 PMFrank Dekervel
12/17/2020, 4:16 PMOleg Agapov
12/17/2020, 5:42 PMtask1 >> task2 >> task3
My solids are kinda stateless (don't return any values), but their order is important.Jacolon Walker
12/19/2020, 8:05 AMError 1: You can only specify a single field at path root:load_from[0]. You specified ['executable_path', 'python_environment', 'target']. The available fields are ['grpc_server', 'python_environment', 'python_file', 'python_module', 'python_package']
Jacolon Walker
12/19/2020, 8:06 AMNoah K
12/19/2020, 8:28 AMJacolon Walker
12/19/2020, 8:29 AMJacolon Walker
12/19/2020, 8:32 AMJacolon Walker
12/19/2020, 8:32 AMDaniel Kim
12/19/2020, 2:53 PMRolf Schick
12/20/2020, 1:12 PMsk4la
12/20/2020, 7:24 PM# Prefect example
import logging
import prefect
@task
def call_a_foreign_function():
log_from_a_foreign_function()
def log_from_a_foreign_function():
logger = prefect.context.get("logger", logging.getLogger("default_logger_if_no_prefect_context_is_found"))
<http://logger.info|logger.info>("It works! I can still log to the main logger even if I'm a foreign function!")
I tried to do something like this in order to do the equivalent within Dagster:
# Dagster equivalent
import logging
from dagster import solid
@solid
def call_a_foreign_function():
log_from_a_foreign_function()
def log_from_a_foreign_function():
logging.getLoggerClass()("dagster").warning("Can I speak to the UI like this?")
logging.getLogger("dagster").warning("Can I speak to the UI like this?")
Sadly, it does not seem to work, I’m only able to log to the Dagit console (not the UI).
Does Dagster have a cool way to log to the UI from “outside” of solids ?Rolf Schick
12/21/2020, 12:25 PMesztermarton
12/21/2020, 2:27 PMschrockn
12/21/2020, 5:07 PM