madhurt
06/29/2021, 7:46 PMImportError: cannot import name 'EventLogEntry' from 'dagster.core.events.log' (/usr/local/lib/python3.7/site-packages/dagster/core/events/log.py)
Michel Rouly
06/29/2021, 9:17 PMfan-in
solids and failures before the fan-in
.
Scenario:
solids A
and B
execute and yield output into solid fan_in
.
normally they both succeed, and fan_in
receives both inputs.
in this case, solid B
fails.
hit 'retry from failure', and B
now succeeds but fan_in
only receives input from B
.
I would have expected fan_in
to receive input from B
and also the previously successful A
. But that doesn't happen.Dean Verhey
06/29/2021, 10:04 PMDockerOperator
to pull an image from ECR (built and pushed as part of another libraries CI/CD process) and then executing commands on that image like so:
...
task_1 = DockerOperator(
task_id="task_1",
dag=dag,
docker_conn_id=DOCKER_CONN_ID,
api_version=DOCKER_API_VERSION,
auto_remove=True,
image=IMAGE,
volumes=VOLUMES,
command=f"our_library run --start {start_date} --end {end_date}"
)
...
task_1 >> task_2 # etc. etc. etc.
I've been poking around the documentation for dagster-docker
and its DockerRunLauncher
class but that doesn't seem immediately analagous to Airflow's DockerOperator
? Is that accurate, or am I missing something?
Alternatively - is that even a pattern you'd recommend with Dagster? We mostly used it for a) libraries with challenging dependency requirements that we didn't want clashing and b) infrequent tasks like running monthly reports where the extra overhead to pull from a container registry wasn't a big deal, but open to other ideas or whatever the dagster-thonic way of doing things is.Polys
06/30/2021, 7:36 AMimagePullSecrets
for the k8sRunLauncher
?
We are in a situation where:
• everything works ok when a dagster-run-...
pod happens to get scheduled on the same node as dagster-user-deployment-...
because the image is cached, but
• fails to pull the image on nodes that don't have it already because it's missing pull secrets.
Many thanks in advance :)laborgo
06/30/2021, 9:09 AMRitasha Verma
06/30/2021, 11:38 AMmadhurt
06/30/2021, 5:26 PMexport DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "<http://app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit|app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit>" \
-o jsonpath="{.items[0].metadata.name}")
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
To have a permanent address instead, what should be done?
Do I create a service
for it?
If yes, what could be the steps? I could appreciate some sense of direction. Do I need to edit values.yaml
too (that we use in helm
)?Michel Rouly
06/30/2021, 6:01 PM.map
? Said another way, is there support for a solid with multiple parameters being passed into .map
?
e.g.
@solid def files_in_directory(): pass
@solid def process_file(path, other_input): pass
@pipeline def my_pipeline():
files = files_in_directory()
other_input = some_other_solid()
files.map(lambda path: process_file(path, other_input)).collect()
In this example, process_file
takes the path
parameter from .map
, but it also takes other_input
from another unrelated solid.Jake Beresford
06/30/2021, 8:55 PMmadhurt
06/30/2021, 9:39 PMk8sLauncher
. It creates separate jobs for each pipeline run. Now, just like one can assign pods to a specific worker node using the nodeSelector OR affinity OR tolerations
properties. Can I assign each job created by the k8sLauncher
to a specific node? I don’t see these properties for k8sLauncher
in .Values
thingie by helm
. But I do see these properties for celeryK8sRunLauncher
. Does that mean I need to switch to ``celeryK8sRunLauncher``?Andrew Parsons
07/01/2021, 2:30 AMnlp.pipe
in one of my solids. This solid will run alone (as in, there will be no concurrently running solids).
Far clarity, this is unrelated to the ability to run multiple solids concurrently using the DynamicOutput pattern. I've successfully achieved that elsewhere in my pipeline.
On spaCy (skip if you're familiar with the library)
If you aren't familiar, spaCy (correctly stylized) is a, if not the, leading NLP library. Canonically, strings are converted to Doc
objects by passing a string to an nlp
object's __call__
method.
Performance improvements can be had by replacing plain iteration with `nlp.pipe(texts, n_process: int)`:
import spacy
from spacy.language import Language
from spacy.tokens import Doc
nlp: Language = spacy.load('model_name')
texts: List[str] = ...
# slower
for text in texts:
doc: Doc = nlp(text)
# far faster
pipe: Generator = nlp.pipe(texts, n_process=16)
• nlp.pipe's usage documentation
• nlp.pipe's API documentation
My problem
I seek to pump hundreds of millions of sentences through spaCy. This works outside of Dagster just fine. However, I'm finding that invoking nlp.pipe(...)
within a Dagster solid results in an extreme slowdown.
If I feed nlp.pipe()
a trivial amount of texts, it completes in ~3 seconds when n_process=1
and more than 120 seconds when n_process > 1
. I've monitored htop
whilst the Dagster pipeline runs and can see one CPU running at ~95% and the remaining hovering around 1-2%. Every so often there is a jump among the others to about 10%, but honestly that might just be some other background process.
This AWS EC2 instance has 16 vCPUs, and so I typically set n_process=15.
If I run this same block of code outside of Dagster (using IPython, for example), I can see all 15 vCPUs hit 100% usage.
I'm aware that nlp.pipe(...)
is a generator function. I have confirmed that I am evaluating all the results in both cases.
What I am asking for
• is it possible to use parallelization from within a single solid?
• ...or is this expected behavior?
• surely this is a Dagster problem and not a spaCy problem, right?
• what might act as a bottleneck?
• is something wrong with my configuration?
To the extent that it matters, I have the following in my configuration:
execution:
multiprocess:
config:
max_concurrent: 15
Here's an excerpt from my `dagster.yaml`:
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
I am running the multi-container Docker deployment as described in the documentation.
Otherwise, Dagster has been a pleasure to use thus far ❤️ Thanks in advance.Ronak Jain
07/01/2021, 6:05 AMMaitiú Ó Ciaráin
07/01/2021, 8:29 AM0.11.14
) with CeleryK8s and so far it's been great but I'm starting to push it to ~300 concurrent solids ( 20 workers on 16 core machines) and seeing some odd behaviour. Mainly, that jobs finish succesfully according to dagster events and the pod logs but dagster seems to think they have neither succeeded or failed (see screenshot attached).
Has anyone seen something like this or have some suggestions?Alessandro Marrella
07/01/2021, 1:35 PMNothing
, while the output type for the cli run is DbtCliOutput
. This creates a type error when trying to sequence dbt solids. Would it be possible to change the input type to Any
? Or is there another pattern you suggest?Dan Zhao
07/01/2021, 9:31 PMwerkzeug.routing.WebsocketMismatch: 400 Bad Request: The browser (or proxy) sent a request that this server could not understand.
Does anyone has a clue why this might happen?pdpark
07/01/2021, 11:02 PMJohn Faucett
07/02/2021, 12:16 PMGeorge Pearse
07/02/2021, 12:23 PMJonathan Kupcho
07/02/2021, 1:27 PMNoah Sanor
07/02/2021, 3:27 PMgrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1625239477.655628200","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3008,"referenced_errors":[{"created":"@1625239477.655621700","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}" >
. Code and more details in 🧵 .Guillaume Jeusel
07/03/2021, 6:40 AMresource.resource_fn(None)
.
Am I missing something ?
Also, ideally, before any other test starts, this mock should be instantiated again (avoid side effects).
Maybe one solution might be to permit adding mode definitions to a PipelineDefinition after instantiation ? That way, I would have some fixture adding the “unittest” mode on my pipelines, with resources mocked value being other fixtures usable inside my tests.
As I don’t really see the point of having a ‘unittest’ mode defined in my production code. Altough I do see the benefit of having a mode mocking specific resources to avoid changing any system state and just allow the user to test the “Extract” and “Transform” part.Daniel Kim
07/03/2021, 9:44 PMField()
class. It would be nice to be able to reveal or show what the default values are. For example, perhaps in dagit's Playground, it would load or show the following:
resources:
values:
config:
start_date_yyyymmdd: <today's date>
end_date_yyyymmdd: <tomorrow's date>
The justification for this is the person executing the pipeline may not know what the default values are. As a workaround, I could mention what the default values are in the solids' docstrings or the solid definition's description=
parameter.Jazzy
07/04/2021, 12:11 PM@solid(config_schema={"profiles-dir": str, "project-dir": str})
def run_dbt_models(context):
config_test = {
"profiles-dir": context.solid_config["profiles-dir"],
"project-dir": context.solid_config["project-dir"]
}
dbt_cli_run.configured(
config_or_config_fn=config_test,
name="run_dbt_models",
)
Matt Delacour
07/05/2021, 11:57 AM@solid
def group_by_keys(
_,
df: DataFrame,
groupby_map: Dict[str, Dict[str, str]],
groupby_keys: List[str]
) -> DataFrame:
...
@pipeline
def my_pipeline:
...
df = group_by_keys(
df=df,
groupby_keys=["product_id", "happened_on"],
groupby_map={
"stock_beginning_of_day": {"stock_beginning_of_day": "arbitrary"},
"stock_end_of_day": {"stock_end_of_day": "arbitrary"},
"number_of_variation_during_day": {"sync_datetime": "count"},
},
)
In that case, all the parameters pass to the solid _group_by_keys_ needs to be the output of another solid. This means that I need to create other solids in my pipeline just to ouput _groupby_keys_ and _groupby_map_ which makes the code very verbose IMO
Another solution would be to have _group_by_keys_ as a python function but that will mean that I need to create solids with unique names because of all this namespaces concept
@solid
def my_pipeline_groupby_ops(_, df: DataFrame) -> DataFrame:
return group_by_keys(
df=df,
groupby_keys=["product_id", "happened_on"],
groupby_map={
"stock_beginning_of_day": {"stock_beginning_of_day": "arbitrary"},
"stock_end_of_day": {"stock_end_of_day": "arbitrary"},
"number_of_variation_during_day": {"sync_datetime": "count"},
},
)
So my question is "What is the best approach to make _group_by_keys_ easy to reuse across pipelines ?"
Thanks in advance 🙏Antti Soininen
07/05/2021, 12:15 PMreexecute_pipeline_iterator()
in a for loop. Is there a way to access a solid's output value once the iterator yields a STEP_OUTPUT
event? Or, in more general, how does one access solids' output values when using (re)execute_pipeline_iterator()
?Chris Le Sueur
07/05/2021, 2:03 PMPlayground
tab for the pipeline, and is the following:
Error: TypeError: Cannot read property '__typename' of undefined
at u (ConfigTypeSchema.tsx:73)
at ConfigTypeSchema.tsx:91
at Array.map (<anonymous>)
at u (ConfigTypeSchema.tsx:84)
at ConfigTypeSchema.tsx:147
at oi (react-dom.production.min.js:157)
at Bi (react-dom.production.min.js:180)
at Ii (react-dom.production.min.js:178)
at Pi (react-dom.production.min.js:177)
at Fa (react-dom.production.min.js:274)
If anyone is able to assist so we can get tests working again on the most recent version that would be great!Jessica Franks
07/05/2021, 5:57 PMtakan
07/06/2021, 4:23 AMHebo Yang
07/06/2021, 6:02 AMAdrian Rumpold
07/06/2021, 6:43 AM