#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
import logging.config
import warnings
from io import StringIO
from typing import Optional
from unittest.mock import Mock

from pyperf import perf_counter

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.protocol import ServerFactory
from twisted.logger import LogBeginner, LogPublisher
from twisted.protocols.basic import LineOnlyReceiver

from synapse.config.logger import _setup_stdlib_logging
from synapse.logging import RemoteHandler
from synapse.synapse_rust import reset_logging_config
from synapse.types import ISynapseReactor
from synapse.util import Clock


class LineCounter(LineOnlyReceiver):
    delimiter = b"\n"
    count = 0

    def lineReceived(self, line: bytes) -> None:
        self.count += 1

        assert isinstance(self.factory, Factory)

        if self.count >= self.factory.wait_for and self.factory.on_done:
            on_done = self.factory.on_done
            self.factory.on_done = None
            on_done.callback(True)


class Factory(ServerFactory):
    protocol = LineCounter
    wait_for: int
    on_done: Optional[Deferred]


async def main(reactor: ISynapseReactor, loops: int) -> float:
    """
    Benchmark how long it takes to send `loops` messages.
    """

    logger_factory = Factory()
    logger_factory.wait_for = loops
    logger_factory.on_done = Deferred()
    port = reactor.listenTCP(0, logger_factory, backlog=50, interface="127.0.0.1")

    # A fake homeserver config.
    class Config:
        class server:
            server_name = "synmark-" + str(loops)

        # This odd construct is to avoid mypy thinking that logging escapes the
        # scope of Config.
        class _logging:
            no_redirect_stdio = True

        logging = _logging

    hs_config = Config()

    # To be able to sleep.
    clock = Clock(reactor)

    errors = StringIO()
    publisher = LogPublisher()
    mock_sys = Mock()
    beginner = LogBeginner(
        publisher, errors, mock_sys, warnings, initialBufferSize=loops
    )

    address = port.getHost()
    assert isinstance(address, (IPv4Address, IPv6Address))
    log_config = {
        "version": 1,
        "loggers": {"synapse": {"level": "DEBUG", "handlers": ["remote"]}},
        "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}},
        "handlers": {
            "remote": {
                "class": "synapse.logging.RemoteHandler",
                "formatter": "tersejson",
                "host": address.host,
                "port": address.port,
                "maximum_buffer": 100,
            }
        },
    }

    logger = logging.getLogger("synapse")
    _setup_stdlib_logging(
        hs_config,  # type: ignore[arg-type]
        None,
        logBeginner=beginner,
    )

    # Force a new logging config without having to load it from a file.
    logging.config.dictConfig(log_config)
    reset_logging_config()

    # Wait for it to connect...
    for handler in logging.getLogger("synapse").handlers:
        if isinstance(handler, RemoteHandler):
            break
    else:
        raise RuntimeError("Improperly configured: no RemoteHandler found.")

    await handler._service.whenConnected(failAfterFailures=10)

    start = perf_counter()

    # Send a bunch of useful messages
    for i in range(loops):
        logger.info("test message %s", i)

        if len(handler._buffer) == handler.maximum_buffer:
            while len(handler._buffer) > handler.maximum_buffer / 2:
                await clock.sleep(0.01)

    await logger_factory.on_done

    end = perf_counter() - start

    handler.close()
    port.stopListening()

    return end