用python開發一款操作MySQL的小工具
https://github.com/lishukan/directsql
安裝pip3 install directsql導入
directsql 目前只提供三個外部類
__all__=['SqlGenerator','MysqlConnection','MysqlPool']
導入方式
from directsql.sqlgenerator import SqlGenerator #該類用于生成sql語句#下面是一個池化連接對象MysqlPool 和一個簡單連接對象 MysqlConnectorfrom directsql.connector import MysqlConnection,MysqlConnector 使用1 創建連接
# 1. 傳入有名參數 conn = MysqlConnection(host=’127.0.0.1’, port=3306, password=’123456’, database=’test_base’) print(conn.database) conn=MysqlPool(host=’127.0.0.1’, port=3306, password=’123456’, database=’test_base’) # 也可使用 直接 參數字典 conn_args = {’host’: ’127.0.0.1’,’port’: 3306,’password’: ’123456’,’database’:’test_base’, } conn = MysqlConnection(**conn_args)#單個連接 print(conn.database) conn = MysqlPool(**conn_args) #池化連接對象 print(conn.database) #2 直接使用 字符串 #以下字符串是常用的終端 連接命令 string_arg='mysql -uroot -h127.0.0.1 -P3306 -p123456 -Dtest_base' conn = MysqlConnection(string_arg=string_arg) print(conn.database) conn = MysqlPool(string_arg=string_arg) print(conn.database) 2 執行sql語句
事實上directsql 封裝了 很多 語句。可以滿足很大一部分日常使用場景。但是如果有復雜的語句,仍然需要調用原生的sql 執行。而且directsql 中很多封裝好的方法是先拼接sql 再 調用該語句,所以這里還是先簡單介紹下,directsql 如何執行原生sql。
無論是 MysqlConnection 類 還是 MysqlPool 類 都通過 execute_sql 方法 來執行sql。
例如 :
id name age 1 羅輯 28 2 莊顏 25 3 葉文潔 54 4 程心 25 5 云天明 27
conn = MysqlConnection(string_arg='mysql -uroot -h127.0.0.1 -P3306 -p123456 -Dtest')result,count=conn.execute_sql('select * from test_table ')print(result)print(count)>>> ((1, ’羅輯’, ’28’), (2, ’莊顏’, ’25’), (3, ’葉文潔’, ’54’), (4, ’程心’, ’25’), (5, ’云天明’, ’27’))>>> 5 #這里默認是普通游標,你也可以指定使用字典游標:result, count = conn.execute_sql('select * from test_table ', cursor_type=’dict’)>>>[{’ID’: 1, ’name’: ’羅輯’, ’age’: ’28’}, {’ID’: 2, ’name’: ’莊顏’, ’age’: ’25’}, {’ID’: 3, ’name’: ’葉文潔’, ’age’: ’54’}, {’ID’: 4, ’name’: ’程心’, ’age’: ’25’}, {’ID’: 5, ’name’: ’云天明’, ’age’: ’27’}]>>>5
execute_sql 方法 返回的是一個元組,(結果集,條數)
下文出現的所有方法無特殊說明都是返回元組,且支持dict游標
附帶參數執行語句
這里的參數使用起來和 pymysql 提供的 execute 以及executemany 沒有任何 差別,以下簡單提供幾個示例:
#傳元組result,count=conn.execute_sql('select * from test_table where age=%s ',param=(25,))#傳字典result, count = conn.execute_sql('select * from test_table where age=%(age)s ', param={’age’: 25})#元組列表result, count = conn.execute_sql('insert into test_table(`age`,`name`)values(%s,%s) ', param=[(’宋運輝’, 37), (’程開顏’, 33)])#字典列表result, count = conn.execute_sql('insert into test_table(`age`,`name`)values(%(age)s,%(name)s) ',param=[ {'name':'宋運輝',’age’:37}, {'name':'程開顏',’age’:33} ])3 select 方法
select 方法 可以接受多參數,參數列表如下。
def select(self, columns=’id’, table=None, where=None, group_by: str = None, order_by: str = None, limit: int = None, offset=None,cursor_type=None):
》》》 conn.select(’*’, ’test_table’)
select id from test_table where age=25》》》 conn.select(’*’, ’test_table’, where={’age’: 25})
select name,age from test_table where age=25 and id=2多字段直接傳入字符串
》》》 conn.select('age,name', ’test_table’, where={’age’: 25,’id’:2})
傳入列表/元組
》》》 conn.select([’age’,’name’], ’test_table’, where={’age’: 25,’id’:2})
select * from test_table group by id order by age desc limit 1 offset 1》》》conn.select(’*’, ’test_table’, order_by=’age desc’,group_by=’id’,limit=1,offset=1)
select 功能看起來甚至不如直接寫原生sql 快,但是如果查詢條件是在不斷變化的,尤其是where條件,那么使用select 方法 會比自行拼接更方便。
例如,需要不斷地讀取一個字典變量,然后根據這個變量中的條件去查詢數據,而這個字典的鍵個數會變化,但是鍵都恰好是表的字段。這個時候使用select 方法會十分簡便,只需要令where參數等于那個字典即可。
平心而論,這個方法確實用處不大。
4 insert_into 方法def insert_into(self, table, data: dict or list, columns=None, ignroe=False, on_duplicate_key_update: str = None, return_id=False):
該方法可以接受傳入字典或者 字典列表,并且可選 返回 游標影響的條數 或者是 新插入的數據的id。
columns 為空時,將取第一條數據的所有鍵,此時請確保所有數據鍵相同。
#傳入 字典data_1 = {'age': 44, ’name’: '雷東寶'}count = conn.insert_into(’test_table’, data_1)#默認返回受影響條數print(count) #>>> 1 return_id = conn.insert_into(’test_table’, data_1,return_id=True)# 可選返回idprint(return_id)>>>22533 #傳入字典列表data_2={'age': 22, ’name’: '宋運萍'}all_data=[data_1,data_2]count = conn.insert_into(’test_table’, all_data)#限定 插入的字段。(字典有多字段,但是只需要往表里插入指定的字段時)data_3= {'age': 44, ’name’: '雷東寶','title':'村支書'} #title不需要,只要age和namecount = conn.insert_into(’test_table’, data_1,columns=['age','name'] )#ignore 參數data_1 = {'age': 44, ’name’: '雷東寶','id':22539}count = conn.insert_into(’test_table’,ignore=True )print(count)>>> 0 # 由于表中id 22539 已經存在,該條記錄不會插入,影響 0條數據#on_duplicate_key_update 參數data_1 = {'age': 44, ’name’: '雷東寶','id':22539} #id=22539 已經存在count = conn.insert_into(’test_table’, data_1,on_duplicate_key_update=’ name='雷copy' ’)print(count)#返回影響條數>>>2 #嘗試插入一條,但是發生重復,于是刪除新數據,并更新舊數據。實際上影響了兩條。
在insert_into 方法中提供了 on_duplicate_key_update 參數,但是實際上使用起來比較雞肋,需要自己傳入 on_duplicate_key_update 后的語句進行拼接。
如果你僅僅只是需要在發生重復時將舊數據的特定字段更新為新數據對應字段的值時。merge_into 方法更適合。
5 merge_into 方法在 其他關系型數據庫中,提供有merge into 的語法,但是mysql 中沒有提供。 不過這里我們通過insert 和 on_duplicate_key_update 語法 封裝出了一個 類似merge_into 的方法。 該方法返回的是影響的條數
def* merge_into(self, table, data, columns=None, need_merge_columns: list = None):
columns 為空時,將取第一條數據的所有鍵,此時請確保所有數據鍵相同。
need_merge_columns 為在發生重復時需要替換(覆蓋)的字段。
data_1 = {'age': 44, ’name’: '雷東寶','id':22539}data_2={'age': 22, ’name’: '宋運萍','id':22540}all_data = [data_1, data_2,]count=conn.merge_into(’test_table’,all_data,need_merge_columns=[’name’,])print(count)>>>4#兩條數據正好都是重復的,插入兩條又刪除后修改兩條 ,返回46 replace_into 方法
該方法簡單,不做過多說明。該方法 返回的是影響的條數
def replace_into(self,table, data: dict or list, columns=None)
data_1 = {'age': 44, ’name’: '雷東寶','id':22539}data_2={'age': 22, ’name’: '宋運萍','id':22540}all_data = [data_1, data_2,]count=conn.replace_into(’test_table’,all_data)7 update 方法
def update(self,table, data: dict, where, columns: None or list = None, limit=None):
該方法data 參數只接受傳入字典。該方法 返回的是影響的條數
data_1 = {'age': 44, ’name’: '雷copy'}count=conn.update(’test_table’,data_1,where={’id’:22539}) #更新 id=22539的數據為 新的data_1print(count)>>>1
除此之外,還提供了一個衍生的方法
def update_by_primary(self, table, data: dict, pri_value, columns=None, primary: str = ’id’):
用于通過主鍵去更新數據。pri_value 即為主鍵的值。primary 為主鍵,默認為id
data_1 = {'age': 44, ’name’: '雷cpy'}count=conn.update_by_primary(’test_table’,data_1,pri_value=22539)8 delete 方法
def delete_by_primary(self, table, pri_value, primary=’id’):'''通過主鍵刪除數據'''def delete(self,table, where: str or dict, limit: int = 0):'''通過where條件刪除數據'''count=conn.delete(’test_table’,where={’name’:’雷東寶’}) #刪除name=雷東寶的數據count=conn.delete_by_primary(’test_table’,pri_value=22539) #刪除主鍵等于22539 的數據9 使用 事務
def do_transaction(self, sql_params: list, cursor_type=None):
sql_params 為 元組列表。 【(sql_1,param_1),(sql_2,param_2】
如果sql 不需要參數也要傳入 None ,如 【(sql_1,None),】
sql_params = [('update test_table set name=%(name)s where id=%(id)s ', {’name’: ’洛基’, ’id’: 22539}),('update test_table set name=%(name)s where id=%(id)s ', {’name’: ’mask’, ’id’: 22540}), ]count=conn.do_transaction(sql_params)>>>((), 1) #返回最后一條執行語句的 結果和影響條數10 讀取流式游標結果
def read_ss_result(self, sql, param=None, cursor_type=’ss’):
cursor_type 可選 ss 和 ssdict
注意,該方法返回的是 生成器對象,拿到結果需要不斷進行遍歷。
result=conn.read_ss_result('select * from test_table')for data in result:print(data)
以上就是python開發一款操作MySQL的小工具的詳細內容,更多關于python 操作MySQL的資料請關注好吧啦網其它相關文章!
相關文章: