Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/apify_client/_streamed_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import re
import threading
from asyncio import Task
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from threading import Thread
from typing import TYPE_CHECKING, Self, cast
from typing import TYPE_CHECKING, ClassVar, Self, cast

from apify_client._docs import docs_group

Expand Down Expand Up @@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
"""

# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
# this window instead of waiting for the long-polling default.
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)

def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
"""Initialize `StreamedLog`.

Expand Down Expand Up @@ -138,17 +142,17 @@ def __exit__(
self.stop()

def _stream_log(self) -> None:
with self._log_client.stream(raw=True) as log_stream:
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
if not log_stream:
return
for data in log_stream.iter_bytes():
self._process_new_data(data)
if self._stop_logging:
break

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)
return
try:
for data in log_stream.iter_bytes():
self._process_new_data(data)
if self._stop_logging:
break
finally:
# Flush the last buffered part even if the read timed out or was stopped.
self._log_buffer_content(include_last_part=True)


@docs_group('Other')
Expand Down Expand Up @@ -214,8 +218,9 @@ async def _stream_log(self) -> None:
async with self._log_client.stream(raw=True) as log_stream:
if not log_stream:
return
async for data in log_stream.aiter_bytes():
self._process_new_data(data)

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)
try:
async for data in log_stream.aiter_bytes():
self._process_new_data(data)
finally:
# Flush the last buffered part even if the task is cancelled by `stop()`.
self._log_buffer_content(include_last_part=True)
147 changes: 146 additions & 1 deletion tests/unit/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
Expand All @@ -15,7 +16,7 @@
from apify_client._logging import RedirectLogFormatter
from apify_client._models import ActorJobStatus
from apify_client._status_message_watcher import StatusMessageWatcherBase
from apify_client._streamed_log import StreamedLogBase
from apify_client._streamed_log import StreamedLog, StreamedLogBase

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -717,3 +718,147 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
elapsed = time.monotonic() - start

assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'


_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'


def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
{
'data': {
'id': _MOCKED_RUN_ID,
'actId': _MOCKED_ACTOR_ID,
'userId': 'test_user_id',
'startedAt': '2019-11-30T07:34:24.202Z',
'finishedAt': '2019-12-12T09:30:12.202Z',
'status': 'RUNNING',
'statusMessage': 'Running',
'isStatusMessageTerminal': False,
'meta': {'origin': 'WEB'},
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
'buildId': 'test_build_id',
'generalAccess': 'RESTRICTED',
'defaultKeyValueStoreId': 'test_kvs_id',
'defaultDatasetId': 'test_dataset_id',
'defaultRequestQueueId': 'test_rq_id',
'buildNumber': '0.0.1',
'containerUrl': 'https://test.runs.apify.net',
}
}
)
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
{
'data': {
'id': _MOCKED_ACTOR_ID,
'userId': 'test_user_id',
'name': _MOCKED_ACTOR_NAME,
'username': 'test_user',
'isPublic': False,
'createdAt': '2019-07-08T11:27:57.401Z',
'modifiedAt': '2019-07-08T14:01:05.546Z',
'stats': {
'totalBuilds': 0,
'totalRuns': 0,
'totalUsers': 0,
'totalUsers7Days': 0,
'totalUsers30Days': 0,
'totalUsers90Days': 0,
'totalMetamorphs': 0,
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
},
'versions': [],
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
'deploymentKey': 'test_key',
}
}
)


@pytest.mark.usefixtures('propagate_stream_logs')
async def test_streamed_log_async_stop_flushes_buffered_tail(
caplog: LogCaptureFixture,
httpserver: HTTPServer,
) -> None:
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
stop_emitting = threading.Event()

def _tail_handler(_request: Request) -> Response:
def generate_logs() -> Iterator[bytes]:
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
yield _TAIL_SECOND_MESSAGE.encode()
# Block until the test tears the server down (or stop releases us).
stop_emitting.wait(timeout=30)

return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')

httpserver.expect_request(
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
).respond_with_handler(_tail_handler)
_register_run_and_actor_endpoints(httpserver)

api_url = httpserver.url_for('/').removesuffix('/')
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
streamed_log = await run_client.get_streamed_log()

logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'

try:
with caplog.at_level(logging.DEBUG, logger=logger_name):
async with streamed_log:
# Wait long enough for both chunks to arrive and be processed.
await asyncio.sleep(1)
# Context exit calls stop(), which cancels the task mid-stream.
finally:
stop_emitting.set()

messages = [record.message for record in caplog.records]
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'


@pytest.mark.usefixtures('propagate_stream_logs')
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
httpserver: HTTPServer,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
# Shorten the read timeout so the test doesn't wait for the production default.
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))

release_server = threading.Event()

def _silent_handler(_request: Request) -> Response:
def generate_logs() -> Iterator[bytes]:
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
# response; then block without emitting any log data.
yield b''
release_server.wait(timeout=30)

return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')

httpserver.expect_request(
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
).respond_with_handler(_silent_handler)
_register_run_and_actor_endpoints(httpserver)

api_url = httpserver.url_for('/').removesuffix('/')
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
streamed_log = run_client.get_streamed_log()

streamed_log.start()
try:
# Give the streaming thread time to start and block inside iter_bytes.
time.sleep(0.3)

# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
stop_thread = threading.Thread(target=streamed_log.stop)
stop_thread.start()
stop_thread.join(timeout=5)
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
finally:
release_server.set()