blob: 353ce175f540edbd786bf73c775b4df8ebd47d86 [file] [log] [blame]
NETCONF connections
Author: Radek Krejci <>
import json
import os
from liberouterapi import auth
from flask import request
import libyang as ly
import netconf2 as nc
from .inventory import INVENTORY
from .devices import devices_get
from .error import NetopeerException
sessions = {}
def connect():
session = auth.lookup(request.headers.get('Authorization', None))
user = session['user']
path = os.path.join(INVENTORY, user.username)
device_id = request.get_json()['id']
if not device_id:
raise NetopeerException('Invalid connect request.')
device = devices_get(device_id, user.username)
if not device:
raise NetopeerException('Unknown device to connect to request.')
ssh = nc.SSH(device['username'], password=device['password'])
session = nc.Session(device['hostname'], device['port'], ssh)
except Exception as e:
return(json.dumps({'success': False, 'error-msg': str(e)}))
if not user.username in sessions:
sessions[user.username] = {}
# use key (as hostname:port:session-id) to store the created NETCONF session
key = + ":" + str(session.port) + ":" +
sessions[user.username][key] = session
return(json.dumps({'success': True, 'session-key': key}))
def session_get_capabilities():
session = auth.lookup(request.headers.get('Authorization', None))
user = session['user']
req = request.args.to_dict()
if not 'key' in req:
return(json.dumps({'success': False, 'error-msg': 'Missing session key.'}))
if not user.username in sessions:
sessions[user.username] = {}
key = req['key']
if not key in sessions[user.username]:
return(json.dumps({'success': False, 'error-msg': 'Invalid session key.'}))
cpblts = []
for c in sessions[user.username][key].capabilities:
return(json.dumps({'success': True, 'capabilities': cpblts}))
def session_get():
session = auth.lookup(request.headers.get('Authorization', None))
user = session['user']
req = request.args.to_dict()
if not 'key' in req:
return(json.dumps({'success': False, 'error-msg': 'Missing session key.'}))
if not user.username in sessions:
sessions[user.username] = {}
key = req['key']
if not key in sessions[user.username]:
return(json.dumps({'success': False, 'error-msg': 'Invalid session key.'}))
session = sessions[user.username][key]
data = session.rpcGet()
except nc.ReplyError as e:
reply = {'success': False, 'error': []}
for err in e.args[0]:
return(json.dumps({'success': True, 'data': json.loads(data.print_mem(ly.LYD_JSON, ly.LYP_WITHSIBLINGS))}))
def session_close():
session = auth.lookup(request.headers.get('Authorization', None))
user = session['user']
req = request.args.to_dict()
if not 'key' in req:
return(json.dumps({'success': False, 'error-msg': 'Missing session key.'}))
if not user.username in sessions:
sessions[user.username] = {}
key = req['key']
if not key in sessions[user.username]:
return(json.dumps({'success': False, 'error-msg': 'Invalid session key.'}))
del sessions[user.username][key]
return(json.dumps({'success': True}))
def session_alive():
session = auth.lookup(request.headers.get('Authorization', None))
user = session['user']
req = request.args.to_dict()
if not 'key' in req:
return(json.dumps({'success': False, 'error-msg': 'Missing session key.'}))
if not user.username in sessions:
sessions[user.username] = {}
key = req['key']
if not key in sessions[user.username]:
return(json.dumps({'success': False, 'error-msg': 'Invalid session key.'}))
return(json.dumps({'success': True}))