"""Tests for journal_lock.py code."""
from __future__ import annotations

import multiprocessing as mp
import os
import pathlib
import sys
from typing import Generator
import pytest
from pytest import MonkeyPatch, TempdirFactory, TempPathFactory
from config import config
from journal_lock import JournalLock, JournalLockResult


###########################################################################
# For some tests (at least on Linux) we need another process to already
# hold the lock.
# This is at top level due to multiprocessing.Process wanting to
# pickle its arguments, other_process_lock() being one of them.
def other_process_lock(continue_q: mp.Queue, exit_q: mp.Queue, lockfile: pathlib.Path):
    """
    Obtain the lock in a sub-process.

    :param continue_q: Write to this when parent should continue.
    :param exit_q: When there's an item in this, exit.
    :param lockfile: Path where the lockfile should be.
    """
    with open(lockfile / 'edmc-journal-lock.txt', mode='w+') as lf:
        print(f'sub-process: Opened {lockfile} for read...')
        # This needs to be kept in sync with journal_lock.py:_obtain_lock()
        if not _obtain_lock('sub-process', lf):
            print('sub-process: Failed to get lock, so returning')
            return

        print('sub-process: Got lock, telling main process to go...')
        continue_q.put('go', timeout=5)
        # Wait for signal to exit
        print('sub-process: Waiting for exit signal...')
        exit_q.get(block=True, timeout=None)

        # And clean up
        _release_lock('sub-process', lf)
        os.unlink(lockfile / 'edmc-journal-lock.txt')


def _obtain_lock(prefix: str, filehandle) -> bool:
    """
    Obtain the JournalLock.

    :param prefix: str - what to prefix output with.
    :param filehandle: File handle already open on the lockfile.
    :return: bool - True if we obtained the lock.
    """
    if sys.platform == 'win32':
        print(f'{prefix}: On win32')
        import msvcrt
        try:
            print(f'{prefix}: Trying msvcrt.locking() ...')
            msvcrt.locking(filehandle.fileno(), msvcrt.LK_NBLCK, 4096)

        except Exception as e:
            print(f'{prefix}: Unable to lock file: {e!r}')
            return False

    else:
        import fcntl

        print(f'{prefix}: Not win32, using fcntl')
        try:
            fcntl.flock(filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

        except Exception as e:
            print(f'{prefix}: Unable to lock file: {e!r}')
            return False

    return True


def _release_lock(prefix: str, filehandle) -> bool:
    """
    Release the JournalLock.

    :param prefix: str - what to prefix output with.
    :param filehandle: File handle already open on the lockfile.
    :return: bool - True if we released the lock.
    """
    if sys.platform == 'win32':
        print(f'{prefix}: On win32')
        import msvcrt
        try:
            print(f'{prefix}: Trying msvcrt.locking() ...')
            filehandle.seek(0)
            msvcrt.locking(filehandle.fileno(), msvcrt.LK_UNLCK, 4096)

        except Exception as e:
            print(f'{prefix}: Unable to unlock file: {e!r}')
            return False

    else:
        import fcntl

        print(f'{prefix}: Not win32, using fcntl')
        try:
            fcntl.flock(filehandle, fcntl.LOCK_UN)

        except Exception as e:
            print(f'{prefix}: Unable to unlock file: {e!r}')
            return False

    return True
###########################################################################


class TestJournalLock:
    """JournalLock test class."""

    @pytest.fixture
    def mock_journaldir(
        self, monkeypatch: MonkeyPatch,
        tmp_path_factory: TempdirFactory
    ) -> Generator:
        """Fixture for mocking config.get_str('journaldir')."""
        def get_str(key: str, *, default: str | None = None) -> str:
            """Mock config.*Config get_str to provide fake journaldir."""
            if key == 'journaldir':
                return str(tmp_path_factory.getbasetemp())

            print('Other key, calling up ...')
            return config.get_str(key)  # Call the non-mocked

        with monkeypatch.context() as m:
            m.setattr(config, "get_str", get_str)
            yield tmp_path_factory

    @pytest.fixture
    def mock_journaldir_changing(
            self,
            monkeypatch: MonkeyPatch,
            tmp_path_factory: TempdirFactory
    ) -> Generator:
        """Fixture for mocking config.get_str('journaldir')."""
        def get_str(key: str, *, default: str | None = None) -> str:
            """Mock config.*Config get_str to provide fake journaldir."""
            if key == 'journaldir':
                return tmp_path_factory.mktemp("changing")  # type: ignore

            print('Other key, calling up ...')
            return config.get_str(key)  # Call the non-mocked

        with monkeypatch.context() as m:
            m.setattr(config, "get_str", get_str)
            yield tmp_path_factory

    ###########################################################################
    # Tests against JournalLock.__init__()
    def test_journal_lock_init(self, mock_journaldir: TempPathFactory):
        """Test JournalLock instantiation."""
        print(f'{type(mock_journaldir)=}')
        tmpdir = str(mock_journaldir.getbasetemp())

        jlock = JournalLock()
        # Check members are properly initialised.
        assert jlock.journal_dir == tmpdir
        assert jlock.journal_dir_path is not None
        assert jlock.journal_dir_lockfile_name is None

    ###########################################################################
    # Tests against JournalLock.set_path_from_journaldir()
    def test_path_from_journaldir_with_none(self):
        """Test JournalLock.set_path_from_journaldir() with None."""
        jlock = JournalLock()

        # Check that 'None' is handled correctly.
        jlock.journal_dir = None
        jlock.set_path_from_journaldir()
        assert jlock.journal_dir_path is None

    def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.set_path_from_journaldir() with tmpdir."""
        tmpdir = mock_journaldir

        jlock = JournalLock()

        # Check that an actual journaldir is handled correctly.
        jlock.journal_dir = str(tmpdir)
        jlock.set_path_from_journaldir()
        assert isinstance(jlock.journal_dir_path, pathlib.Path)

    ###########################################################################
    # Tests against JournalLock.obtain_lock()
    def test_obtain_lock_with_none(self):
        """Test JournalLock.obtain_lock() with None."""
        jlock = JournalLock()

        # Check that 'None' is handled correctly.
        jlock.journal_dir = None
        jlock.set_path_from_journaldir()
        assert jlock.journal_dir_path is None
        locked = jlock.obtain_lock()
        assert locked == JournalLockResult.JOURNALDIR_IS_NONE

    def test_obtain_lock_with_tmpdir(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.obtain_lock() with tmpdir."""
        jlock = JournalLock()

        # Check that an actual journaldir is handled correctly.
        locked = jlock.obtain_lock()
        assert locked == JournalLockResult.LOCKED
        assert jlock.locked

        # Cleanup, to avoid side-effect on other tests
        assert jlock.release_lock()
        os.unlink(str(jlock.journal_dir_lockfile_name))

    def test_obtain_lock_with_tmpdir_ro(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.obtain_lock() with read-only tmpdir."""
        tmpdir = str(mock_journaldir.getbasetemp())
        print(f'{tmpdir=}')

        # Make tmpdir read-only ?
        if sys.platform == 'win32':
            # Ref: <https://stackoverflow.com/a/12168268>
            import ntsecuritycon as con
            import win32security

            # Fetch user details
            winuser, domain, type = win32security.LookupAccountName("", os.environ.get('USERNAME'))
            # Fetch the current security of tmpdir for that user.
            sd = win32security.GetFileSecurity(tmpdir, win32security.DACL_SECURITY_INFORMATION)
            dacl = sd.GetSecurityDescriptorDacl()  # instead of dacl = win32security.ACL()

            # Add Write to Denied list
            # con.FILE_WRITE_DATA results in a 'Special permissions' being
            # listed on Properties > Security for the user in the 'Deny' column.
            # Clicking through to 'Advanced' shows a 'Deny' for
            # 'Create files / write data'.
            dacl.AddAccessDeniedAce(win32security.ACL_REVISION, con.FILE_WRITE_DATA, winuser)
            # Apply that change.
            sd.SetSecurityDescriptorDacl(1, dacl, 0)  # may not be necessary
            win32security.SetFileSecurity(tmpdir, win32security.DACL_SECURITY_INFORMATION, sd)

        else:
            import stat
            os.chmod(tmpdir, stat.S_IRUSR | stat.S_IXUSR)

        jlock = JournalLock()

        # Check that an actual journaldir is handled correctly.
        locked = jlock.obtain_lock()

        # Revert permissions for test cleanup
        if sys.platform == 'win32':
            # We can reuse winuser etc from before
            import pywintypes

            # We have to call GetAce() until we find one that looks like what
            # we added.
            i = 0
            ace = dacl.GetAce(i)
            while ace:
                if ace[0] == (con.ACCESS_DENIED_ACE_TYPE, 0) and ace[1] == con.FILE_WRITE_DATA:
                    # Delete the Ace that we added
                    dacl.DeleteAce(i)
                    # Apply that change.
                    sd.SetSecurityDescriptorDacl(1, dacl, 0)  # may not be necessary
                    win32security.SetFileSecurity(tmpdir, win32security.DACL_SECURITY_INFORMATION, sd)
                    break

                i += 1
                try:
                    ace = dacl.GetAce(i)

                except pywintypes.error:
                    print("Couldn't find the Ace we added, so can't remove")
                    break

        else:
            os.chmod(tmpdir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

        assert locked == JournalLockResult.JOURNALDIR_READONLY

    def test_obtain_lock_already_locked(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.obtain_lock() with tmpdir."""
        continue_q: mp.Queue = mp.Queue()
        exit_q: mp.Queue = mp.Queue()
        locker = mp.Process(target=other_process_lock,
                            args=(continue_q, exit_q, mock_journaldir.getbasetemp())
                            )
        print('Starting sub-process other_process_lock()...')
        locker.start()
        # Wait for the sub-process to have locked
        print('Waiting for "go" signal from sub-process...')
        continue_q.get(block=True, timeout=5)

        print('Attempt actual lock test...')
        # Now attempt to lock with to-test code
        jlock = JournalLock()
        second_attempt = jlock.obtain_lock()
        assert second_attempt == JournalLockResult.ALREADY_LOCKED

        # Need to release any handles on the lockfile else the sub-process
        # might not be able to clean up properly, and that will impact
        # on later tests.
        jlock.journal_dir_lockfile.close()  # type: ignore

        print('Telling sub-process to quit...')
        exit_q.put('quit')
        print('Waiting for sub-process...')
        locker.join()
        print('Done.')

    ###########################################################################
    # Tests against JournalLock.release_lock()
    def test_release_lock(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.release_lock()."""
        # First actually obtain the lock, and check it worked
        jlock = JournalLock()
        jlock.obtain_lock()
        assert jlock.locked

        # Now release the lock
        assert jlock.release_lock()

        # And finally check it actually IS unlocked.
        with open(mock_journaldir.getbasetemp() / 'edmc-journal-lock.txt', mode='w+') as lf:
            assert _obtain_lock('release-lock', lf)
            assert _release_lock('release-lock', lf)

        # Cleanup, to avoid side-effect on other tests
        os.unlink(str(jlock.journal_dir_lockfile_name))

    def test_release_lock_not_locked(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.release_lock() when not locked."""
        jlock = JournalLock()
        assert jlock.release_lock()

    def test_release_lock_lie_locked(self, mock_journaldir: TempPathFactory):
        """Test JournalLock.release_lock() when not locked, but lie we are."""
        jlock = JournalLock()
        jlock.locked = True
        assert jlock.release_lock() is False

    ###########################################################################
    # Tests against JournalLock.update_lock()
    def test_update_lock(
            self,
            mock_journaldir_changing: TempPathFactory):
        """
        Test JournalLock.update_lock().

        NB: This uses mock_journaldir_changing so that each subsequent call
            to config.get_str('journaldir') gets a new, unique, path.
            This should mean that the call to update_lock() switches to a new
            journaldir.
        """
        # First actually obtain the lock, and check it worked
        jlock = JournalLock()
        jlock.obtain_lock()
        assert jlock.locked

        # Now store the 'current' journaldir for reference and attempt
        # to update to a new one.
        old_journaldir = jlock.journal_dir
        old_journaldir_lockfile_name = jlock.journal_dir_lockfile_name
        jlock.update_lock(None)  # type: ignore
        assert jlock.journal_dir != old_journaldir
        assert jlock.locked

        # Cleanup, to avoid side-effect on other tests
        assert jlock.release_lock()
        os.unlink(str(jlock.journal_dir_lockfile_name))
        # And the old_journaldir's lockfile too
        os.unlink(str(old_journaldir_lockfile_name))

    def test_update_lock_same(self, mock_journaldir: TempPathFactory):
        """
        Test JournalLock.update_lock().

        Due to using 'static' mock_journaldir this should 'work', because the
        directory is still the same.
        """
        # First actually obtain the lock, and check it worked
        jlock = JournalLock()
        assert jlock.obtain_lock() == JournalLockResult.LOCKED
        assert jlock.locked

        # Now store the 'current' journaldir for reference and attempt
        # to update to a new one.
        old_journaldir = jlock.journal_dir
        jlock.update_lock(None)  # type: ignore
        assert jlock.journal_dir == old_journaldir
        assert jlock.locked

        # Cleanup, to avoid side-effect on other tests
        assert jlock.release_lock()
        os.unlink(str(jlock.journal_dir_lockfile_name))