I am trying to create a CustomBaseOperator that inherits from the Airflow BaseOperator. The CustomBaseOperator works fine, but when I try to create a ChildOperator that inherits from the CustomBaseOperator, Airflow treats it as a CustomBaseOperator.
The CustomBaseOperator has an execute function that looks like this :
def make_request(self):
print("Parent request")
def execute(self, context):
self.make_request()
In the Child Operator, I redefine make_request:
class ChildOperator(CustomBaseOperator):
@apply_defaults
def make_request(self):
print("Child Request")
Whenever I run a task that uses ChildOperator, it prints "Parent request" and the legends shows it as a CustomBaseOperator ... My operators are in an "operators" folder in the "plugins" folder. I am guessing that I can only inherit from "official" operators when creating a custom operator in that folder. Do you know how I could make the inheritance work ?
CodePudding user response:
This works:
import datetime
from airflow import DAG
from airflow.models import BaseOperator
class CustomBaseOperator(BaseOperator):
def make_request(self):
print("Parent request")
def execute(self, context):
self.make_request()
class ChildOperator(CustomBaseOperator):
def make_request(self):
print("Child Request")
with DAG(dag_id="test_dag", start_date=datetime.datetime(2022, 1, 1), schedule_interval=None) as dag:
test = ChildOperator(task_id="test")
This will print Child Request and display ChildOperator in the Airflow UI. Also, note that setting @apply_defaults is deprecated since Airflow 2.0, it is now applied automatically.
CodePudding user response:
I found the issue : I was using a function that loops into locals() to set self.param = param for every param.
