# Copyright (C) 2011-2013 Red Hat, Inc.
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; If not, see .
#
# Author: tasleson
import json
import socket
import string
import os
import unittest
import threading
from lsm._common import LsmError, ErrorNumber
from lsm._common import SocketEOF as _SocketEOF
from lsm._data import DataDecoder as _DataDecoder
from lsm._data import DataEncoder as _DataEncoder
class TransPort(object):
"""
Provides wire serialization by using json. Loosely conforms to json-rpc,
however a length header was added so that we would have the ability to use
non sax like json parsers, which are more abundant.
= 1)
for i in wire:
self.c.send(i)
reply, msg_id = self.client.read_resp()
self.assertTrue(payload == reply)
def tearDown(self):
self.client.send_req("done", None)
resp, msg_id = self.client.read_resp()
self.assertTrue(resp is None)
self.server.join()
if __name__ == "__main__":
unittest.main()