|
| 1 | +"""Tests for example implementations. |
| 2 | +
|
| 3 | +This test module verifies that the example implementations work as expected, |
| 4 | +covering both simple and full examples from the examples directory. |
| 5 | +""" |
| 6 | +import os |
| 7 | +from unittest.mock import patch |
| 8 | + |
| 9 | +import pytest |
| 10 | +from fastapi import FastAPI |
| 11 | +from fastapi.testclient import TestClient |
| 12 | +from google.protobuf import duration_pb2 |
| 13 | + |
| 14 | +from examples.simple.main import app as simple_app |
| 15 | +from examples.full.tasks import app as full_app |
| 16 | +from fastapi_gcp_tasks.utils import emulator_client, queue_path |
| 17 | + |
| 18 | + |
| 19 | +@pytest.fixture |
| 20 | +def simple_client(): |
| 21 | + """Create a test client for the simple example app.""" |
| 22 | + return TestClient(simple_app) |
| 23 | + |
| 24 | + |
| 25 | +@pytest.fixture |
| 26 | +def full_client(): |
| 27 | + """Create a test client for the full example app.""" |
| 28 | + return TestClient(full_app) |
| 29 | + |
| 30 | + |
| 31 | +def test_simple_example_local_mode(simple_client, monkeypatch): |
| 32 | + """Test simple example in local mode. |
| 33 | + |
| 34 | + This test verifies that: |
| 35 | + 1. Emulator client is used in local mode |
| 36 | + 2. Tasks are properly queued |
| 37 | + 3. Default settings work correctly |
| 38 | + 4. Environment variables are properly handled |
| 39 | + """ |
| 40 | + # Ensure we're in local mode |
| 41 | + monkeypatch.setenv("IS_LOCAL", "true") |
| 42 | + monkeypatch.setenv("TASK_LISTENER_BASE_URL", "http://localhost:8000") |
| 43 | + |
| 44 | + # Test trigger endpoint |
| 45 | + response = simple_client.get("/trigger") |
| 46 | + assert response.status_code == 200 |
| 47 | + assert response.json() == {"message": "Basic hello task triggered"} |
| 48 | + |
| 49 | + # Test hello task endpoint |
| 50 | + response = simple_client.post( |
| 51 | + "/delayed/hello", |
| 52 | + json={"message": "test message"} |
| 53 | + ) |
| 54 | + assert response.status_code == 200 |
| 55 | + |
| 56 | + |
| 57 | +def test_full_example_chained_hooks(full_client, monkeypatch): |
| 58 | + """Test full example with chained hooks. |
| 59 | + |
| 60 | + This test verifies that: |
| 61 | + 1. OIDC and deadline hooks work together |
| 62 | + 2. Hook order is preserved |
| 63 | + 3. Hook configuration is correct |
| 64 | + 4. Hooks are properly applied to tasks |
| 65 | + """ |
| 66 | + # Set up test environment |
| 67 | + monkeypatch.setenv("IS_LOCAL", "true") |
| 68 | + monkeypatch.setenv("CLOUD_TASKS_EMULATOR_URL", "http://localhost:8123") |
| 69 | + monkeypatch.setenv("TASK_LISTENER_BASE_URL", "http://localhost:8000") |
| 70 | + |
| 71 | + # Test hello task with chained hooks |
| 72 | + response = full_client.post( |
| 73 | + "/delayed/hello", |
| 74 | + json={"message": "test with hooks"} |
| 75 | + ) |
| 76 | + assert response.status_code == 200 |
| 77 | + |
| 78 | + # Test fail_twice with retries |
| 79 | + response = full_client.post("/delayed/fail_twice") |
| 80 | + assert response.status_code == 500 # Should fail after 2 retries |
| 81 | + |
| 82 | + # Test scheduled hello with hooks |
| 83 | + response = full_client.post( |
| 84 | + "/scheduled/timed_hello", |
| 85 | + json={"message": "test scheduled with hooks"} |
| 86 | + ) |
| 87 | + assert response.status_code == 200 |
| 88 | + assert response.json() == { |
| 89 | + "message": "Scheduled hello task ran with payload: test scheduled with hooks" |
| 90 | + } |
| 91 | + |
| 92 | + |
| 93 | +def test_full_example_hook_configuration(): |
| 94 | + """Test hook configuration in full example. |
| 95 | + |
| 96 | + This test verifies that: |
| 97 | + 1. OIDC token is properly configured |
| 98 | + 2. Deadline duration is set correctly |
| 99 | + 3. Hooks are chained in the right order |
| 100 | + """ |
| 101 | + with patch("fastapi_gcp_tasks.hooks.oidc_delayed_hook") as mock_oidc_hook: |
| 102 | + with patch("fastapi_gcp_tasks.hooks.deadline_delayed_hook") as mock_deadline_hook: |
| 103 | + # Import here to trigger hook creation with mocks |
| 104 | + from examples.full.tasks import DelayedRoute |
| 105 | + |
| 106 | + # Verify OIDC hook was called |
| 107 | + mock_oidc_hook.assert_called_once() |
| 108 | + |
| 109 | + # Verify deadline hook was called with correct duration |
| 110 | + mock_deadline_hook.assert_called_once_with( |
| 111 | + duration=duration_pb2.Duration(seconds=1800) |
| 112 | + ) |
| 113 | + |
| 114 | + |
| 115 | +def test_simple_example_environment_handling(monkeypatch): |
| 116 | + """Test environment handling in simple example. |
| 117 | + |
| 118 | + This test verifies that: |
| 119 | + 1. Local mode uses emulator client |
| 120 | + 2. Environment variables are properly handled |
| 121 | + 3. Default values are used when needed |
| 122 | + """ |
| 123 | + # Test local mode |
| 124 | + monkeypatch.setenv("IS_LOCAL", "true") |
| 125 | + from examples.simple.main import client |
| 126 | + assert client is not None |
| 127 | + |
| 128 | + # Test non-local mode |
| 129 | + monkeypatch.setenv("IS_LOCAL", "false") |
| 130 | + with patch("examples.simple.main.tasks_v2.CloudTasksClient") as mock_client: |
| 131 | + # Reimport to trigger client creation |
| 132 | + from importlib import reload |
| 133 | + import examples.simple.main |
| 134 | + reload(examples.simple.main) |
| 135 | + mock_client.assert_called_once() |
| 136 | + |
| 137 | + |
| 138 | +def test_deployed_environment_scheduling(full_client, monkeypatch): |
| 139 | + """Test scheduling in deployed environment. |
| 140 | + |
| 141 | + This test verifies that: |
| 142 | + 1. Scheduling only occurs when not local |
| 143 | + 2. OIDC tokens are properly used |
| 144 | + 3. Cloud Scheduler integration works |
| 145 | + 4. Environment variables affect scheduling behavior |
| 146 | + """ |
| 147 | + # Mock Cloud Scheduler client |
| 148 | + with patch("google.cloud.scheduler_v1.CloudSchedulerClient") as mock_scheduler: |
| 149 | + # Set up deployed environment |
| 150 | + monkeypatch.setenv("IS_LOCAL", "false") |
| 151 | + monkeypatch.setenv("TASK_LISTENER_BASE_URL", "https://example.com") |
| 152 | + monkeypatch.setenv("SCHEDULED_OIDC_TOKEN", "test-token") |
| 153 | + |
| 154 | + # Reload module to trigger scheduling |
| 155 | + from importlib import reload |
| 156 | + import examples.full.tasks |
| 157 | + reload(examples.full.tasks) |
| 158 | + |
| 159 | + # Verify scheduler client was used |
| 160 | + mock_scheduler.assert_called_once() |
| 161 | + |
| 162 | + # Verify scheduled task creation |
| 163 | + scheduler_instance = mock_scheduler.return_value |
| 164 | + create_job = scheduler_instance.create_job |
| 165 | + assert create_job.called |
| 166 | + |
| 167 | + # Verify job configuration |
| 168 | + job_args = create_job.call_args[0] |
| 169 | + assert len(job_args) == 2 # parent and job args |
| 170 | + job = job_args[1] |
| 171 | + |
| 172 | + # Verify schedule |
| 173 | + assert job.schedule == "*/5 * * * *" |
| 174 | + assert job.time_zone == "Asia/Kolkata" |
| 175 | + |
| 176 | + # Verify OIDC token |
| 177 | + assert job.http_target.oidc_token.service_account_email == "test-token" |
| 178 | + |
| 179 | + # Test endpoint still works |
| 180 | + response = full_client.post( |
| 181 | + "/scheduled/timed_hello", |
| 182 | + json={"message": "test in deployed mode"} |
| 183 | + ) |
| 184 | + assert response.status_code == 200 |
| 185 | + assert response.json() == { |
| 186 | + "message": "Scheduled hello task ran with payload: test in deployed mode" |
| 187 | + } |
0 commit comments