Kautsar Aqsa
08/05/2022, 8:36 AMshell_op()
, but unfortunately I got this error
dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'shell_op' has context argument, but no context was provided when invoking.
and I still don't understand what dagster means context here. Here is my .py code
@op
def Load_To_Elastic(context, table):
"""Loading jobs from warehouse to elasticsearch"""
bash_command = "/Users/kautsaraqsa/ELK/logstash-7.15.2/bin/logstash -f /Users/kautsaraqsa/code/AIHSP/badung-case.conf"
<http://context.log.info|context.log.info>("Indexing table:" + table)
shell_op(context, bash_command)
@graph
def Pipeline():
"""Load table to warehouse then transfer it to Elasticsearch with Logstash"""
table = Ingest_From_Synchro()
Load_To_Elastic(table)
Hope anyone can help me with this, thank you!subprocess
to run the command. But still hoping anyone can explain the error 🙏Zach
08/05/2022, 4:06 PM@op
def generate_bash_command(context, table):
"""Loading jobs from warehouse to elasticsearch"""
bash_command = "/Users/kautsaraqsa/ELK/logstash-7.15.2/bin/logstash -f /Users/kautsaraqsa/code/AIHSP/badung-case.conf"
<http://context.log.info|context.log.info>("Indexing table:" + table)
@graph
def Pipeline():
"""Load table to warehouse then transfer it to Elasticsearch with Logstash"""
table = Ingest_From_Synchro()
bash_command = generate_bash_command(table)
shell_op.alias("Load_To_Elastic")(bash_command)
Kautsar Aqsa
08/21/2022, 5:24 PM