fix bitrot in util/zotsh

This commit is contained in:
zotlabs 2019-08-15 16:28:06 -07:00
parent 6f8c977e73
commit beeafc6bc5
10 changed files with 546 additions and 551 deletions

View File

@ -2,6 +2,7 @@
namespace Zotlabs\Module\Admin;
use App;
use \Zotlabs\Storage\GitRepo;
use \Michelf\MarkdownExtra;
@ -253,14 +254,14 @@ class Addons {
* Single plugin
*/
if (\App::$argc == 3){
$plugin = \App::$argv[2];
if (App::$argc == 3){
$plugin = App::$argv[2];
if (!is_file("addon/$plugin/$plugin.php")){
notice( t("Item not found.") );
return '';
}
$enabled = in_array($plugin,\App::$plugins);
$enabled = in_array($plugin,App::$plugins);
$info = get_plugin_info($plugin);
$x = check_plugin_versions($info);
@ -268,11 +269,11 @@ class Addons {
if($enabled && ! $x) {
$enabled = false;
$idz = array_search($plugin, \App::$plugins);
$idz = array_search($plugin, App::$plugins);
if ($idz !== false) {
unset(\App::$plugins[$idz]);
unset(App::$plugins[$idz]);
uninstall_plugin($plugin);
set_config("system","addon", implode(", ",\App::$plugins));
set_config("system","addon", implode(", ",App::$plugins));
}
}
$info['disabled'] = 1-intval($x);
@ -281,19 +282,19 @@ class Addons {
check_form_security_token_redirectOnErr('/admin/addons', 'admin_addons', 't');
$pinstalled = false;
// Toggle plugin status
$idx = array_search($plugin, \App::$plugins);
$idx = array_search($plugin, App::$plugins);
if ($idx !== false){
unset(\App::$plugins[$idx]);
unset(App::$plugins[$idx]);
uninstall_plugin($plugin);
$pinstalled = false;
info( sprintf( t("Plugin %s disabled."), $plugin ) );
} else {
\App::$plugins[] = $plugin;
App::$plugins[] = $plugin;
install_plugin($plugin);
$pinstalled = true;
info( sprintf( t("Plugin %s enabled."), $plugin ) );
}
set_config("system","addon", implode(", ",\App::$plugins));
set_config("system","addon", implode(", ",App::$plugins));
if($pinstalled) {
@require_once("addon/$plugin/$plugin.php");
@ -305,7 +306,7 @@ class Addons {
// display plugin details
if (in_array($plugin, \App::$plugins)){
if (in_array($plugin, App::$plugins)){
$status = 'on';
$action = t('Disable');
} else {
@ -380,18 +381,18 @@ class Addons {
list($tmp, $id) = array_map('trim', explode('/', $file));
$info = get_plugin_info($id);
$enabled = in_array($id,\App::$plugins);
$enabled = in_array($id,App::$plugins);
$x = check_plugin_versions($info);
// disable plugins which are installed but incompatible versions
if($enabled && ! $x) {
$enabled = false;
$idz = array_search($id, \App::$plugins);
$idz = array_search($id, App::$plugins);
if ($idz !== false) {
unset(\App::$plugins[$idz]);
unset(App::$plugins[$idz]);
uninstall_plugin($id);
set_config("system","addon", implode(", ",\App::$plugins));
set_config("system","addon", implode(", ",App::$plugins));
}
}
$info['disabled'] = 1-intval($x);

View File

@ -35,13 +35,6 @@ class Cloud extends \Zotlabs\Web\Controller {
if (argc() > 1)
$which = argv(1);
if (argc() < 2 && intval(get_config('system','cloud_disable_siteroot'))) {
notice( t('Permission denied.') . EOL);
construct_page();
killme();
}
$profile = 0;
if ($which)

View File

@ -95,6 +95,8 @@ class Dav extends \Zotlabs\Web\Controller {
$auth = new \Zotlabs\Storage\BasicAuth();
$auth->observer = get_observer_hash();
$auth->setRealm(ucfirst(\Zotlabs\Lib\System::get_platform_name()) . ' ' . 'WebDAV');
$rootDirectory = new \Zotlabs\Storage\Directory('/', $auth);

View File

@ -720,7 +720,11 @@ class Directory extends DAV\Node implements DAV\ICollection, DAV\IQuota, DAV\IMo
* @return array Directory[]
*/
function ChannelList(&$auth) {
$ret = array();
$ret = [];
if (intval(get_config('system','cloud_disable_siteroot'))) {
return $ret;
}
$r = q("SELECT channel_id, channel_address, profile.publish FROM channel left join profile on profile.uid = channel.channel_id WHERE channel_removed = 0 AND channel_system = 0 AND (channel_pageflags & %d) = 0",
intval(PAGE_HIDDEN)
@ -730,8 +734,7 @@ class Directory extends DAV\Node implements DAV\ICollection, DAV\IQuota, DAV\IMo
foreach ($r as $rr) {
if (perm_is_allowed($rr['channel_id'], $auth->observer, 'view_storage') && $rr['publish']) {
logger('found channel: /cloud/' . $rr['channel_address'], LOGGER_DATA);
// @todo can't we drop '/cloud'? It gets stripped off anyway in RedDirectory
$ret[] = new Directory('/cloud/' . $rr['channel_address'], $auth);
$ret[] = new Directory($rr['channel_address'], $auth);
}
}
}

View File

@ -16,6 +16,13 @@ Extract somewere and launch zotsh.py
Description
-----------
Update: 2019-08-14
Have just looked at this after several years of bitrot and made some updates.
it functions for cli DAV access on your assigned hub, but magic-auth to dav repos on other hubs
(e.g. the host command) needs to be updated to work with openwebauth.
----
ZotSH is a command line WebDAV client for Hubzilla.
It knows how to magic-auth to remote hubs using Zot.

Binary file not shown.

View File

@ -1,202 +1,202 @@
import requests
import platform
from numbers import Number
import xml.etree.cElementTree as xml
from collections import namedtuple
py_majversion, py_minversion, py_revversion = platform.python_version_tuple()
if py_majversion == '2':
from httplib import responses as HTTP_CODES
from urlparse import urlparse
else:
from http.client import responses as HTTP_CODES
from urllib.parse import urlparse
DOWNLOAD_CHUNK_SIZE_BYTES = 1 * 1024 * 1024
class WebdavException(Exception):
pass
class ConnectionFailed(WebdavException):
pass
def codestr(code):
return HTTP_CODES.get(code, 'UNKNOWN')
File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype'])
def prop(elem, name, default=None):
child = elem.find('.//{DAV:}' + name)
return default if child is None else child.text
def elem2file(elem):
return File(
prop(elem, 'href'),
int(prop(elem, 'getcontentlength', 0)),
prop(elem, 'getlastmodified', ''),
prop(elem, 'creationdate', ''),
prop(elem, 'getcontenttype', ''),
)
class OperationFailed(WebdavException):
_OPERATIONS = dict(
HEAD = "get header",
GET = "download",
PUT = "upload",
DELETE = "delete",
MKCOL = "create directory",
PROPFIND = "list directory",
)
def __init__(self, method, path, expected_code, actual_code):
self.method = method
self.path = path
self.expected_code = expected_code
self.actual_code = actual_code
operation_name = self._OPERATIONS[method]
self.reason = 'Failed to {operation_name} "{path}"'.format(**locals())
expected_codes = (expected_code,) if isinstance(expected_code, Number) else expected_code
expected_codes_str = ", ".join('{0} {1}'.format(code, codestr(code)) for code in expected_codes)
actual_code_str = codestr(actual_code)
msg = '''\
{self.reason}.
Operation : {method} {path}
Expected code : {expected_codes_str}
Actual code : {actual_code} {actual_code_str}'''.format(**locals())
super(OperationFailed, self).__init__(msg)
class Client(object):
def __init__(self, host, port=0, auth=None, username=None, password=None,
protocol='http', verify_ssl=True, path=None, cert=None, session=None):
if not port:
port = 443 if protocol == 'https' else 80
self.baseurl = '{0}://{1}:{2}'.format(protocol, host, port)
if path:
self.baseurl = '{0}/{1}'.format(self.baseurl, path)
self.cwd = '/'
if session is None:
self.session = requests.session()
else:
self.session = session
self.session.verify = verify_ssl
self.session.stream = True
if cert:
self.session.cert = cert
if auth:
self.session.auth = auth
elif username and password:
self.session.auth = (username, password)
def _send(self, method, path, expected_code, **kwargs):
url = self._get_url(path).strip(".")
#~ print self.session
#~ print self.session.verify
#~ print self.session.params
#~ print self.session.cookies
response = self.session.request(method, url, allow_redirects=False, **kwargs)
#~ print response.request.method
#~ print response.request.url
if isinstance(expected_code, Number) and response.status_code != expected_code \
or not isinstance(expected_code, Number) and response.status_code not in expected_code:
raise OperationFailed(method, path, expected_code, response.status_code)
return response
def _get_url(self, path):
path = str(path).strip()
if path.startswith('/'):
return self.baseurl + path
return "".join((self.baseurl, self.cwd, path))
def cd(self, path):
path = path.strip()
if not path:
return
stripped_path = '/'.join(part for part in path.split('/') if part) + '/'
if stripped_path == '/':
self.cwd = stripped_path
elif path.startswith('/'):
self.cwd = '/' + stripped_path
elif stripped_path == "./":
return
elif stripped_path == "../":
self.cwd ='/'.join( self.cwd.split('/')[:-2] ) + '/'
else:
self.cwd += stripped_path
def mkdir(self, path, safe=False):
expected_codes = 201 if not safe else (201, 301, 405)
self._send('MKCOL', path, expected_codes)
def mkdirs(self, path):
dirs = [d for d in path.split('/') if d]
if not dirs:
return
if path.startswith('/'):
dirs[0] = '/' + dirs[0]
old_cwd = self.cwd
try:
for dir in dirs:
try:
self.mkdir(dir, safe=True)
except Exception as e:
if e.actual_code == 409:
raise
finally:
self.cd(dir)
finally:
self.cd(old_cwd)
def rmdir(self, path, safe=False):
path = str(path).rstrip('/') + '/'
expected_codes = 204 if not safe else (204, 404)
self._send('DELETE', path, expected_codes)
def delete(self, path):
self._send('DELETE', path, 204)
def upload(self, local_path_or_fileobj, remote_path):
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'rb') as f:
self._upload(f, remote_path)
else:
self._upload(local_path_or_fileobj, remote_path)
def _upload(self, fileobj, remote_path):
self._send('PUT', remote_path, (200, 201, 204), data=fileobj)
def download(self, remote_path, local_path_or_fileobj):
response = self._send('GET', remote_path, 200, stream=True)
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'wb') as f:
self._download(f, response)
else:
self._download(local_path_or_fileobj, response)
def _download(self, fileobj, response):
for chunk in response.iter_content(DOWNLOAD_CHUNK_SIZE_BYTES):
fileobj.write(chunk)
def ls(self, remote_path='.'):
headers = {'Depth': '1'}
response = self._send('PROPFIND', remote_path, (207, 301), headers=headers)
# Redirect
if response.status_code == 301:
url = urlparse(response.headers['location'])
return self.ls(url.path)
tree = xml.fromstring(response.content)
return [elem2file(elem) for elem in tree.findall('{DAV:}response')]
def exists(self, remote_path):
response = self._send('HEAD', remote_path, (200, 301, 404))
return True if response.status_code != 404 else False
import requests
import platform
from numbers import Number
import xml.etree.cElementTree as xml
from collections import namedtuple
py_majversion, py_minversion, py_revversion = platform.python_version_tuple()
if py_majversion == '2':
from httplib import responses as HTTP_CODES
from urlparse import urlparse
else:
from http.client import responses as HTTP_CODES
from urllib.parse import urlparse
DOWNLOAD_CHUNK_SIZE_BYTES = 1 * 1024 * 1024
class WebdavException(Exception):
pass
class ConnectionFailed(WebdavException):
pass
def codestr(code):
return HTTP_CODES.get(code, 'UNKNOWN')
File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype'])
def prop(elem, name, default=None):
child = elem.find('.//{DAV:}' + name)
return default if child is None or child.text is None else child.text
def elem2file(elem):
return File(
prop(elem, 'href'),
int(prop(elem, 'getcontentlength', 0)),
prop(elem, 'getlastmodified', ''),
prop(elem, 'creationdate', ''),
prop(elem, 'getcontenttype', ''),
)
class OperationFailed(WebdavException):
_OPERATIONS = dict(
HEAD = "get header",
GET = "download",
PUT = "upload",
DELETE = "delete",
MKCOL = "create directory",
PROPFIND = "list directory",
)
def __init__(self, method, path, expected_code, actual_code):
self.method = method
self.path = path
self.expected_code = expected_code
self.actual_code = actual_code
operation_name = self._OPERATIONS[method]
self.reason = 'Failed to {operation_name} "{path}"'.format(**locals())
expected_codes = (expected_code,) if isinstance(expected_code, Number) else expected_code
expected_codes_str = ", ".join('{0} {1}'.format(code, codestr(code)) for code in expected_codes)
actual_code_str = codestr(actual_code)
msg = '''\
{self.reason}.
Operation : {method} {path}
Expected code : {expected_codes_str}
Actual code : {actual_code} {actual_code_str}'''.format(**locals())
super(OperationFailed, self).__init__(msg)
class Client(object):
def __init__(self, host, port=0, auth=None, username=None, password=None,
protocol='http', verify_ssl=True, path=None, cert=None, session=None):
if not port:
port = 443 if protocol == 'https' else 80
self.baseurl = '{0}://{1}:{2}'.format(protocol, host, port)
if path:
self.baseurl = '{0}/{1}'.format(self.baseurl, path)
self.cwd = '/'
if session is None:
self.session = requests.session()
else:
self.session = session
self.session.verify = verify_ssl
self.session.stream = True
if cert:
self.session.cert = cert
if auth:
self.session.auth = auth
elif username and password:
self.session.auth = (username, password)
def _send(self, method, path, expected_code, **kwargs):
url = self._get_url(path).strip(".")
#~ print self.session
#~ print self.session.verify
#~ print self.session.params
#~ print self.session.cookies
response = self.session.request(method, url, allow_redirects=False, **kwargs)
#~ print response.request.method
#~ print response.request.url
if isinstance(expected_code, Number) and response.status_code != expected_code \
or not isinstance(expected_code, Number) and response.status_code not in expected_code:
raise OperationFailed(method, path, expected_code, response.status_code)
return response
def _get_url(self, path):
path = str(path).strip()
if path.startswith('/'):
return self.baseurl + path
return "".join((self.baseurl, self.cwd, path))
def cd(self, path):
path = path.strip()
if not path:
return
stripped_path = '/'.join(part for part in path.split('/') if part) + '/'
if stripped_path == '/':
self.cwd = stripped_path
elif path.startswith('/'):
self.cwd = '/' + stripped_path
elif stripped_path == "./":
return
elif stripped_path == "../":
self.cwd ='/'.join( self.cwd.split('/')[:-2] ) + '/'
else:
self.cwd += stripped_path
def mkdir(self, path, safe=False):
expected_codes = 201 if not safe else (201, 301, 405)
self._send('MKCOL', path, expected_codes)
def mkdirs(self, path):
dirs = [d for d in path.split('/') if d]
if not dirs:
return
if path.startswith('/'):
dirs[0] = '/' + dirs[0]
old_cwd = self.cwd
try:
for dir in dirs:
try:
self.mkdir(dir, safe=True)
except Exception as e:
if e.actual_code == 409:
raise
finally:
self.cd(dir)
finally:
self.cd(old_cwd)
def rmdir(self, path, safe=False):
path = str(path).rstrip('/') + '/'
expected_codes = 204 if not safe else (204, 404)
self._send('DELETE', path, expected_codes)
def delete(self, path):
self._send('DELETE', path, 204)
def upload(self, local_path_or_fileobj, remote_path):
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'rb') as f:
self._upload(f, remote_path)
else:
self._upload(local_path_or_fileobj, remote_path)
def _upload(self, fileobj, remote_path):
self._send('PUT', remote_path, (200, 201, 204), data=fileobj)
def download(self, remote_path, local_path_or_fileobj):
response = self._send('GET', remote_path, 200, stream=True)
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'wb') as f:
self._download(f, response)
else:
self._download(local_path_or_fileobj, response)
def _download(self, fileobj, response):
for chunk in response.iter_content(DOWNLOAD_CHUNK_SIZE_BYTES):
fileobj.write(chunk)
def ls(self, remote_path='.'):
headers = {'Depth': '1'}
response = self._send('PROPFIND', remote_path, (207, 301), headers=headers)
# Redirect
if response.status_code == 301:
url = urlparse(response.headers['location'])
return self.ls(url.path)
tree = xml.fromstring(response.content)
return [elem2file(elem) for elem in tree.findall('{DAV:}response')]
def exists(self, remote_path):
response = self._send('HEAD', remote_path, (200, 301, 404))
return True if response.status_code != 404 else False

Binary file not shown.

View File

@ -1,324 +1,313 @@
#!/usr/bin/env python2
import sys, os
import ConfigParser
import requests
from requests.auth import HTTPBasicAuth
import easywebdav
import easywebdav.__version__ as easywebdavversion
__version__= "0.0.2"
SERVER = None
USER = None
PASSWD = None
VERIFY_SSL=True
#####################################################
class CommandNotFound(Exception):
pass
class ZotSH(object):
commands = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','upload','download',
'host', 'pwd','cat',
'lcd','lpwd', 'lls',
'quit', 'help']
def __init__(self, host, session=None, davclient=None):
self.sessions = {}
self.host = host
self.session = session
self.davclient = davclient
@property
def host(self):
return self._host
@host.setter
def host(self, host):
self._host = host
self._hostname = host.replace("https:","").replace("/","")
@property
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, hostname):
self._host = "https://%s/" % (hostname)
self._hostname = hostname
@property
def session(self):
return self._session
@session.setter
def session(self, session):
self._session = session
self.davclient = easywebdav.connect( self.hostname, protocol='https', session=session, path="cloud", verify_ssl=VERIFY_SSL)
@property
def PS1(self):
if self.davclient is None:
return "[!]> "
return "%s:%s> " % (self.hostname, self.davclient.cwd)
def get_host_session(self, host=None):
#~ if host is None:
#~ host = self.host
#~ if not host.startswith("https"):
#~ host = "https://%s/" % (host)
#~ if host in self.sessions:
#~ session = self.sessions[host]
#~ else:
#~ session = requests.Session()
#~ self.sessions[host] = session
#~ if not host == SERVER
#~ session.params.update({'davguest':1})
#~ return session
if self.session is None:
session = requests.Session()
#session.params.update({'davguest':1})
else:
session = self.session
session.params.update({'davguest': (not host == SERVER) })
return session
def do(self, command, *args):
if not command in self.commands:
raise CommandNotFound("Unknow command '%s'" % command)
cmd = getattr(self, "cmd_%s"%command, None)
if cmd is None:
cmd = getattr(self.davclient, command)
return cmd(*args)
def cmd_exists(self, *args):
if (len(args)==0):
return
return self.davclient.exists(args[0])
def cmd_mkdir(self, *args):
if (len(args)==0):
return
return self.davclient.mkdir(args[0])
def cmd_mkdirs(self, *args):
if (len(args)==0):
return
return self.davclient.mkdirs(args[0])
def cmd_rmdir(self, *args):
if (len(args)==0):
return
return self.davclient.rmdir(args[0])
def cmd_delete(self, *args):
if (len(args)==0):
return
return self.davclient.delete(args[0])
def cmd_upload(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.upload(args[0], args[1])
def cmd_download(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.download(args[0], args[1])
def cmd_host(self, *args):
if (len(args)==0):
return
newhostname = args[0]
newhost = "https://%s/" % newhostname
if newhostname == "~" or newhost == SERVER:
# bach to home server
self.host = SERVER
self.session = self.get_host_session(SERVER)
return
session_remote = self.get_host_session(newhost)
session_home = self.get_host_session(SERVER)
# call /magic on SERVER
r = session_home.get(
SERVER + "magic",
params={'dest': newhost},
allow_redirects=False,
verify=VERIFY_SSL )
if not 'location' in r.headers:
raise Exception("Cannot start magic auth to '%s'" % newhostname)
auth_url = r.headers['location']
# call auth_url with "test" param
r = session_remote.get(
auth_url,
params={'test': 1 },
verify=VERIFY_SSL )
if r.json()['success']:
self.hostname = newhostname
self.session = session_remote
else:
raise Exception("Cannot magic auth to '%s'" % newhostname)
def cmd_pwd(self, *args):
return "%s%s" % ( self.davclient.baseurl, self.davclient.cwd )
def cmd_ls(self, *args):
extra_args = ["-a", "-l", "-d"]
show_hidden = "-a" in args
show_list = "-l" in args
show_only_dir = "-d" in args
args = [ a for a in args if not a in extra_args ]
r = self.davclient.ls(*args)
l = max([ len(str(f.size)) for f in r ] + [7,])
def _fmt(type, size, name):
if show_list:
return "%s %*d %s" % (type, l, f.size , name)
else:
return name
if show_hidden :
print _fmt('d', 0, "./")
if self.davclient.cwd!="/":
print _fmt('d', 0, "../")
for f in r:
name = f.name.replace("/cloud"+self.davclient.cwd,"")
type = "-"
if name.endswith("/"):
type = "d"
if name!="":
if show_hidden or not name.startswith("."):
if not show_only_dir or type=="d":
print _fmt(type, f.size , name)
def cmd_lpwd(self, *args):
return os.getcwd()
def cmd_lcd(self, *args):
if (len(args)==0):
return
os.chdir(args[0])
def cmd_lls(self, *args):
for f in os.listdir(os.getcwd()):
if os.path.isdir(f):
f=f+"/"
print f
def cmd_help(self, *args):
print "ZotSH",__version__
print
print "Commands:"
for c in self.commands:
print "\t",c
print
print "easywebdav", easywebdavversion.__version__, "(mod)"
print "requests", requests.__version__
def cmd_cat(self,*args):
if (len(args)==0):
return
rfile = args[0]
resp = self.davclient._send('GET', rfile, (200,))
print resp.text
def load_conf():
global SERVER,USER,PASSWD,VERIFY_SSL
homedir = os.getenv("HOME")
if homedir is None:
homedir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH"))
optsfile = ".zotshrc"
if not os.path.isfile(optsfile):
optsfile = os.path.join(homedir, ".zotshrc")
if not os.path.isfile(optsfile):
print "Please create a configuration file called '.zotshrc':"
print "[zotsh]"
print "host = https://yourhost.com/"
print "username = your_username"
print "password = your_password"
sys.exit(-1)
config = ConfigParser.ConfigParser()
config.read(optsfile)
SERVER = config.get('zotsh', 'host')
USER = config.get('zotsh', 'username')
PASSWD = config.get('zotsh', 'password')
if config.has_option('zotsh', 'verify_ssl'):
VERIFY_SSL = config.getboolean('zotsh', 'verify_ssl')
def zotsh():
zotsh = ZotSH( SERVER)
session_home = zotsh.get_host_session()
#~ #login on home server
print "loggin in..."
r = session_home.get(
SERVER + "api/account/verify_credentials",
auth=HTTPBasicAuth(USER, PASSWD),
verify=VERIFY_SSL )
print "Hi", r.json()['name']
zotsh.session = session_home
# command loop
input = raw_input(zotsh.PS1)
while (input != "quit"):
input = input.strip()
if len(input)>0:
toks = [ x.strip() for x in input.split(" ") ]
command = toks[0]
args = toks[1:]
try:
ret = zotsh.do(command, *args)
except easywebdav.client.OperationFailed, e:
print e
except CommandNotFound, e:
print e
else:
if ret is not None:
print ret
input = raw_input(zotsh.PS1)
if __name__=="__main__":
load_conf()
zotsh()
sys.exit()
#!/usr/bin/env python
import sys, os
import ConfigParser
import requests
from requests.auth import HTTPBasicAuth
import easywebdav
import easywebdav.__version__ as easywebdavversion
import base64
__version__= "0.0.2"
SERVER = None
USER = None
PASSWD = None
VERIFY_SSL=True
#####################################################
class CommandNotFound(Exception):
pass
class ZotSH(object):
commands = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','upload','download',
'host', 'pwd','cat',
'lcd','lpwd', 'lls',
'quit', 'help']
def __init__(self, host, session=None, davclient=None):
self.sessions = {}
self.host = host
self.session = session
self.davclient = davclient
@property
def host(self):
return self._host
@host.setter
def host(self, host):
self._host = host
self._hostname = host.replace("https:","").replace("/","")
@property
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, hostname):
self._host = "https://%s/" % (hostname)
self._hostname = hostname
@property
def session(self):
return self._session
@session.setter
def session(self, session):
self._session = session
self.davclient = easywebdav.connect( self.hostname, protocol='https', session=session, path="dav", verify_ssl=VERIFY_SSL)
@property
def PS1(self):
if self.davclient is None:
return "[!]> "
return "%s:%s> " % (self.hostname, self.davclient.cwd)
def get_host_session(self, host=None):
#~ if host is None:
#~ host = self.host
#~ if not host.startswith("https"):
#~ host = "https://%s/" % (host)
#~ if host in self.sessions:
#~ session = self.sessions[host]
#~ else:
#~ session = requests.Session()
#~ self.sessions[host] = session
#~ if not host == SERVER
#~ session.params.update({'davguest':1})
#~ return session
if self.session is None:
session = requests.Session()
#session.params.update({'davguest':1})
else:
session = self.session
#session.params.update({'davguest': (not host == SERVER) })
return session
def do(self, command, *args):
if not command in self.commands:
raise CommandNotFound("Unknown command '%s'" % command)
cmd = getattr(self, "cmd_%s"%command, None)
if cmd is None:
cmd = getattr(self.davclient, command)
return cmd(*args)
def cmd_exists(self, *args):
if (len(args)==0):
return
return self.davclient.exists(args[0])
def cmd_mkdir(self, *args):
if (len(args)==0):
return
return self.davclient.mkdir(args[0])
def cmd_mkdirs(self, *args):
if (len(args)==0):
return
return self.davclient.mkdirs(args[0])
def cmd_rmdir(self, *args):
if (len(args)==0):
return
return self.davclient.rmdir(args[0])
def cmd_delete(self, *args):
if (len(args)==0):
return
return self.davclient.delete(args[0])
def cmd_upload(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.upload(args[0], args[1])
def cmd_download(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.download(args[0], args[1])
def cmd_host(self, *args):
if (len(args)==0):
return
newhostname = args[0]
newhost = "https://%s/" % newhostname
if newhostname == "~" or newhost == SERVER:
# bach to home server
self.host = SERVER
self.session = self.get_host_session(SERVER)
return
session_remote = self.get_host_session(newhost)
session_home = self.get_host_session(SERVER)
bnewhost = newhost + 'dav'
bnewhost = bnewhost.encode('hex')
r = session_home.get(
SERVER + "magic",
params={'bdest': bnewhost, 'owa': 1},
allow_redirects=True,
verify=VERIFY_SSL )
self.hostname = newhostname
self.session = session_remote
def cmd_pwd(self, *args):
return "%s%s" % ( self.davclient.baseurl, self.davclient.cwd )
def cmd_ls(self, *args):
extra_args = ["-a", "-l", "-d"]
show_hidden = "-a" in args
show_list = "-l" in args
show_only_dir = "-d" in args
args = [ a for a in args if not a in extra_args ]
r = self.davclient.ls(*args)
l = max([ len(str(f.size)) for f in r ] + [7,])
def _fmt(type, size, name):
if show_list:
return "%s %*d %s" % (type, l, f.size , name)
else:
return name
if show_hidden :
print _fmt('d', 0, "./")
if self.davclient.cwd!="/":
print _fmt('d', 0, "../")
for f in r:
name = f.name.replace("/dav"+self.davclient.cwd,"")
type = "-"
if name.endswith("/"):
type = "d"
if name!="":
if show_hidden or not name.startswith("."):
if not show_only_dir or type=="d":
print _fmt(type, f.size , name)
def cmd_lpwd(self, *args):
return os.getcwd()
def cmd_lcd(self, *args):
if (len(args)==0):
return
os.chdir(args[0])
def cmd_lls(self, *args):
for f in os.listdir(os.getcwd()):
if os.path.isdir(f):
f=f+"/"
print f
def cmd_help(self, *args):
print "ZotSH",__version__
print
print "Commands:"
for c in self.commands:
print "\t",c
print
print "easywebdav", easywebdavversion.__version__, "(mod)"
print "requests", requests.__version__
def cmd_cat(self,*args):
if (len(args)==0):
return
rfile = args[0]
resp = self.davclient._send('GET', rfile, (200,))
print resp.text
def load_conf():
global SERVER,USER,PASSWD,VERIFY_SSL
homedir = os.getenv("HOME")
if homedir is None:
homedir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH"))
optsfile = ".zotshrc"
if not os.path.isfile(optsfile):
optsfile = os.path.join(homedir, ".zotshrc")
if not os.path.isfile(optsfile):
print "Please create a configuration file called '.zotshrc':"
print "[zotsh]"
print "host = https://yourhost.com/"
print "username = your_username"
print "password = your_password"
sys.exit(-1)
config = ConfigParser.ConfigParser()
config.read(optsfile)
SERVER = config.get('zotsh', 'host')
USER = config.get('zotsh', 'username')
PASSWD = config.get('zotsh', 'password')
if config.has_option('zotsh', 'verify_ssl'):
VERIFY_SSL = config.getboolean('zotsh', 'verify_ssl')
def zotsh():
zotsh = ZotSH( SERVER)
session_home = zotsh.get_host_session()
#~ #login on home server
print "loggin in..."
r = session_home.get(
SERVER + "api/account/verify_credentials",
auth=HTTPBasicAuth(USER, PASSWD),
verify=VERIFY_SSL )
print "Hi", r.json()['name']
zotsh.session = session_home
# command loop
input = raw_input(zotsh.PS1)
while (input != "quit"):
input = input.strip()
if len(input)>0:
toks = [ x.strip() for x in input.split(" ") ]
command = toks[0]
args = toks[1:]
try:
ret = zotsh.do(command, *args)
except easywebdav.client.OperationFailed, e:
print e
except CommandNotFound, e:
print e
else:
if ret is not None:
print ret
input = raw_input(zotsh.PS1)
if __name__=="__main__":
load_conf()
zotsh()
sys.exit()