Home > Software engineering >  Inherit from a Custom Operator in Airflow
Inherit from a Custom Operator in Airflow

Time:01-18

I am trying to create a CustomBaseOperator that inherits from the Airflow BaseOperator. The CustomBaseOperator works fine, but when I try to create a ChildOperator that inherits from the CustomBaseOperator, Airflow treats it as a CustomBaseOperator.

The CustomBaseOperator has an execute function that looks like this :

def make_request(self): 
    print("Parent request")

def execute(self, context): 

    self.make_request()

In the Child Operator, I redefine make_request:



class ChildOperator(CustomBaseOperator): 
    
    @apply_defaults
    

    def make_request(self): 
        print("Child Request")

Whenever I run a task that uses ChildOperator, it prints "Parent request" and the legends shows it as a CustomBaseOperator ... My operators are in an "operators" folder in the "plugins" folder. I am guessing that I can only inherit from "official" operators when creating a custom operator in that folder. Do you know how I could make the inheritance work ?

CodePudding user response:

This works:

import datetime

from airflow import DAG
from airflow.models import BaseOperator


class CustomBaseOperator(BaseOperator):
    def make_request(self):
        print("Parent request")

    def execute(self, context):
        self.make_request()


class ChildOperator(CustomBaseOperator):
    def make_request(self):
        print("Child Request")


with DAG(dag_id="test_dag", start_date=datetime.datetime(2022, 1, 1), schedule_interval=None) as dag:
    test = ChildOperator(task_id="test")

This will print Child Request and display ChildOperator in the Airflow UI. Also, note that setting @apply_defaults is deprecated since Airflow 2.0, it is now applied automatically.

CodePudding user response:

I found the issue : I was using a function that loops into locals() to set self.param = param for every param.

  •  Tags:  
  • Related