mock.unittest

Here is my operator:
```
class CloudDataCatalogCreateEntryOperator(BaseOperator):

    template_fields = (
        "location",
        "entry_group",
        "entry_id",
        "entry",
        "project_id",
        "retry",
        "timeout",
        "metadata",
        "gcp_conn_id",
    )

    @apply_defaults
    def __init__(
        self,
        location: str,
        entry_group: str,
        entry_id: str,
        entry: Union[Dict, Entry],
        project_id: str = None,
        retry: Retry = None,
        timeout: float = None,
        metadata: Sequence[Tuple[str, str]] = None,
        gcp_conn_id: str = "google_cloud_default",
        *args,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.location = location
        self.entry_group = entry_group
        self.entry_id = entry_id
        self.entry = entry
        self.project_id = project_id
        self.retry = retry
        self.timeout = timeout
        self.metadata = metadata
        self.gcp_conn_id = gcp_conn_id

    def execute(self, context: Dict):
        hook = CloudDataCatalogHook(gcp_conn_id=self.gcp_conn_id)
        try:
            result = hook.create_entry(
                location=self.location,
                entry_group=self.entry_group,
                entry_id=self.entry_id,
                entry=self.entry,
                project_id=self.project_id,
                retry=self.retry,
                timeout=self.timeout,
                metadata=self.metadata,
            )
        except AlreadyExists:
            self.log.info(
                "Entry already exists. Skipping create operation.",
            )
            result = hook.get_entry(
                location=self.location,
                entry_group=self.entry_group,
                entry=self.entry_id,
                project_id=self.project_id,
                retry=self.retry,
                timeout=self.timeout,
                metadata=self.metadata,
            )
        _, _, entry_id = result.name.rpartition("/")
        self.log.info("Current entry_id ID: %s", entry_id)
        context["task_instance"].xcom_push(key="entry_id", value=entry_id)
        return MessageToDict(result)

Here is my unittest:
    @mock.patch(
        
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
        **{
            "return_value.create_entry.side_effect":
AlreadyExists(message="message"),
            "return_value.get_entry.return_value": TEST_ENTRY,
        },
    )
    def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
        task = CloudDataCatalogCreateEntryOperator(
            task_id="task_id",
            location=TEST_LOCATION,
            entry_group=TEST_ENTRY_GROUP_ID,
            entry_id=TEST_ENTRY_ID,
            entry=TEST_ENTRY,
            project_id=TEST_PROJECT_ID,
            retry=TEST_RETRY,
            timeout=TEST_TIMEOUT,
            metadata=TEST_METADATA,
            gcp_conn_id=TEST_GCP_CONN_ID,
        )
        ti = mock.MagicMock()
        result = task.execute(context={"task_instance": ti})
        mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
        mock_hook.return_value.create_entry.assert_called_once_with(
            location=TEST_LOCATION,
            entry_group=TEST_ENTRY_GROUP_ID,
            entry_id=TEST_ENTRY_ID,
            entry=TEST_ENTRY,
            project_id=TEST_PROJECT_ID,
            retry=TEST_RETRY,
            timeout=TEST_TIMEOUT,
            metadata=TEST_METADATA,
        )
        mock_hook.return_value.get_entry.assert_called_once_with(
            location=TEST_LOCATION,
            entry_group=TEST_ENTRY_GROUP_ID,
            entry=TEST_ENTRY_ID,
            project_id=TEST_PROJECT_ID,
            retry=TEST_RETRY,
            timeout=TEST_TIMEOUT,
            metadata=TEST_METADATA,
        )
        ti.xcom_push.assert_called_once_with(key="entry_id",
value=TEST_ENTRY_ID)
        self.assertEqual(TEST_ENTRY_DICT, result)


On Mon, Feb 10, 2020 at 2:02 PM Lior Harel <[email protected]> wrote:
>
> We're testing our operators with something along these lines:
>
> with DAG(dag_id='anydag', start_date=datetime.now()) as dag:
>     operator = MyOperator(task_id='test_task')
>     ti = TaskInstance(task=operator, execution_date=datetime.now())
>     context = ti.get_template_context()
>     operator.execute(context)
>
>
> MyOperator reads XCom data from its context. Is there a way to set the XCom 
> data in the test before executing the operator?
> Is there a simpler way to test the operator without setting up DAG and task 
> instance?
>
> Thanks,

Reply via email to