Hello everyone, i tried to run Logstash job with d...
# ask-community
k
Hello everyone, i tried to run Logstash job with dagster using
shell_op()
, but unfortunately I got this error
Copy code
dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'shell_op' has context argument, but no context was provided when invoking.
and I still don't understand what dagster means context here. Here is my .py code
Copy code
@op
def Load_To_Elastic(context, table):
    """Loading jobs from warehouse to elasticsearch"""
    bash_command = "/Users/kautsaraqsa/ELK/logstash-7.15.2/bin/logstash -f /Users/kautsaraqsa/code/AIHSP/badung-case.conf"
    <http://context.log.info|context.log.info>("Indexing table:" + table)
    shell_op(context, bash_command)
@graph
def Pipeline():
    """Load table to warehouse then transfer it to Elasticsearch with Logstash"""
    table = Ingest_From_Synchro()
    Load_To_Elastic(table)
Hope anyone can help me with this, thank you!
I ended up using python library called
subprocess
to run the command. But still hoping anyone can explain the error 🙏
z
I think it's because you're calling an op from within another op, which isn't supported. I think maybe restructuring your code to use shell_op in your graph instead might work:
Copy code
@op
def generate_bash_command(context, table):
    """Loading jobs from warehouse to elasticsearch"""
    bash_command = "/Users/kautsaraqsa/ELK/logstash-7.15.2/bin/logstash -f /Users/kautsaraqsa/code/AIHSP/badung-case.conf"
    <http://context.log.info|context.log.info>("Indexing table:" + table)
    
@graph
def Pipeline():
    """Load table to warehouse then transfer it to Elasticsearch with Logstash"""
    table = Ingest_From_Synchro()
    bash_command = generate_bash_command(table)
    shell_op.alias("Load_To_Elastic")(bash_command)
🤖 1
👀 1
k
Thank you so much for the explanation!