API アクセスのサンプルコード

ダウンロード
                
import asyncio
import json
from ssl import SSLContext, PROTOCOL_TLS_CLIENT, CERT_NONE
import websockets
import sys


class AnalyzerException(Exception): pass
class UnauthorizedException(Exception): pass


class AnalyzerClient:
    def __init__(self, uri, ssl=None):
        self.uri = uri
        self.ssl = ssl

    async def __aenter__(self):
        self.ws = await websockets.connect(self.uri, ssl=self.ssl)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.ws.close()

    async def process(self, action, text):
        await self.ws.send(json.dumps({
            "type": "analyze",
            "action": action,
            "text": text,
        }))

        state = None
        while state not in ["E", "F"]:
            result = json.loads(await self.ws.recv())
            if result["type"] == "unauthorized":
                raise UnauthorizedException("Not authorized")
            state = result["state"]

        if state == "E":
            raise AnalyzerException("Serverside error")

        await self.ws.send(json.dumps({
            "type": "show",
            "id": result["id"],
        }))
        result = json.loads(await self.ws.recv())
        return result

    @classmethod
    async def process_async(cls, uri, username, password, action, text, ssl=None):
        uri += f"?username={username}&password={password}"
        async with cls(uri, ssl) as client:
            return await client.process(action, text)

    @classmethod
    def process_sync(cls, uri, username, password, action, text, ssl=None):
        return asyncio.get_event_loop().run_until_complete(
            cls.process_async(uri, username, password, action, text, ssl))


if __name__ == "__main__":
    import argparse
    from pathlib import Path

    parser = argparse.ArgumentParser()
    parser.add_argument("input_text", nargs='?', type=Path)
    parser.add_argument("-u", "--username", type=str)
    parser.add_argument("-p", "--password", type=str)
    parser.add_argument("-a", "--action", type=str,
        default='analyze',
        choices=[
            'elink',
            'analyze',
            'analyze-process',
        ],
    )
    parser.add_argument("-o", "--out", type=Path)
    parser.add_argument("-b", "--save-brat", action="store_true")
    parser.add_argument("-w", "--save-webanno", action="store_true")
    parser.add_argument("-r", "--save-relations", action="store_true")
    args = parser.parse_args()

    uri = "wss://material-analyzer.airc.aist.go.jp/analyzer/ws/brat/"

    # handle input
    if args.input_text:
        with args.input_text.open("rt", encoding="utf-8") as f:
            text = f.read()
        basename = args.input_text.with_suffix('').name
    else:
        text = sys.stdin.read()
        basename = 'result'

    # handle output file names
    if args.out:
        out_dir = args.out.parent
        basename = args.out.name
        for suffix in ['.tsv', '.rt', '.webanno', '.ann', '.txt', '.brat']:
            if basename.endswith(suffix):
                basename = basename[:-len(suffix)]
    else:
        if args.input_text:
            out_dir = args.input_text.parent
        else:
            out_dir = Path('.')

    # handle formats:
    no_output_selected = not any(save for save in [
        args.save_brat,
        args.save_webanno,
        args.save_relations,
    ])

    # handle credentials
    if not args.username:
        args.username = input("Username: ")
    if not args.password:
        from getpass import getpass
        args.password = getpass("Password: ")

    ssl = uri.startswith("wss:") or None

    # call analyzer
    result = AnalyzerClient.process_sync(uri, args.username, args.password, args.action, text, ssl=ssl)

    # save TSV (.tsv)
    webanno_data = result.get("tsv")
    if no_output_selected or args.save_webanno:
        if webanno_data:
            no_output_selected = False
            with (out_dir / (basename + ".webanno.tsv")).open("w") as w:
                w.write(webanno_data)
        elif args.save_webanno:
                print("Server returned no Webanno data")

    # save relation tuples (.rt.tsv)
    relation_data = next((
        downloadable
        for downloadable in result.get('extra', {}).get('downloadables', [])
        if downloadable['label'] == '.rt.tsv'
    ), None)
    if no_output_selected or args.save_relations:
        if relation_data:
            no_output_selected = False
            relation_tuples = relation_data['content']
            with (out_dir / (basename + ".relations.tsv")).open("w") as w:
                w.write(relation_tuples)
        elif args.save_relations:
            print("The server returned no relation tuples.")

    # save brat data
    brat_doc = result.get("doc", {})
    ann_data = brat_doc.get("annfile")
    txt_data = brat_doc.get("text")
    if no_output_selected or args.save_brat:
        if ann_data:
            with (out_dir / (basename + ".brat.ann")).open("w") as w:
                w.writelines(ann_data)
        if txt_data:
            with (out_dir / (basename + ".brat.txt")).open("w") as w:
                w.writelines(txt_data)
        elif args.save_brat:
            print("The server returned no brat data.")

    # No data?
    if not any(data for data in [
        webanno_data,
        relation_data,
        ann_data,
        txt_data,
    ]):
        job_id = result["id"]
        print("For some reason, the server returned no data.")
        print(f"Please report to admins as incident {job_id}")