scylladb 驱动使用方法

本文主要总结下 scylladb 的c/c++驱动使用详细方法与步骤,便于快速进行开发。

1. 基本使用步骤

创建 cluster

  1. 创建方法:
    CassCluster * cluster = cass_cluster_new();
    cass_cluster_free(cluster); //不用时释放申请的资源
  2. 对cluster进行参数设置

根据需要对cluster 进行各种相关参数的设置,主要进行以下设置

1
2
3
4
5
cass_cluster_set_port(cluster, port);  //设置集群连接的端口号
cass_cluster_set_contact_points(cluster, addr); //设置集群连接地址,只需要指定一个地址即可
cass_cluster_set_credentials(cluster, user, pass); //设置集群连接时的认证信息(用户名、密码)
cass_cluster_set_num_threads_io(cluster, num); //设置处理io的线程数,,默认2个
cass_cluster_set_core_connections_per_host(cluster, count); // 设置集群中每个服务端的连接数,默认为1个

总的io连接数=io线程数 × 每个服务端的连接数 × 集群服务端个数

创建 session

CassSession维护每个节点的连接以及可调整的I / O线程池,以根据负载平衡策略进行查询。 因为CassSession是线程安全的,所以通常建议您为每个 keyspace 创建一个会话,并在应用程序线程之间共享它。

1
2
CassSession * session = cass_session_new();
cass_session_free(session); //不用时释放申请的资源

执行操作语句

执行操作主要按以下步骤进行处理

  1. 创建操作执行对象
    1
    CassStatement * statement = cass_statement_new(qstr.c_str(), 0);
    第一个参数为要执行的操作语句,第二个参数代表要使用的占位符个数,在批量循环处理的时候使用占位符时使用,
  2. 执行具体操作语句
    1
    CassFuture * futrue = cass_session_execute(session, statement);
  3. 等待执行结果
    1
    cass_future_wait(future);
  4. 判定执行结果
    1
    2
    3
    4
    5
    if (CASS_OK != cass_future_error_code(future)) {
    // 异常处理
    } else {
    // 成功执行相关语句
    }
  5. 释放执行资源
    1
    2
    cass_statement_free(statement);
    cass_future_free(future);

2. 封装库示例

头文件db_scylla.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

#include <vector>
#include <cassandra.h>

namespace DB {

class ed_error;

class scylla_obj
{ //数据库查询查询传递对象
public:
scylla_obj() {}
~scylla_obj () {}

public:
std::string uri_; //uri
std::vector<std::string> vidc_; //idc列表
};
class record_scylla : public record_base
{
public:
record_scylla();
~record_scylla();

public:
std::string uri_; //uri
std::string vidc_; //vidc
std::string cache_; //cache
std::string zone_; //zone
int type_; //type
};

class db_scylla
{
public:
db_scylla ();
~db_scylla ();

/**
* 打开数据库
* @param addr 要打开的地址
* @param port 要打开的端口号
* @return true 成功打开,false 打开失败
*/
bool open(const char * addr, int port);

/**
* 查询内容
* @param obj 要查询的信息
* @param bhot 是否是查询热点数据
* @param vec 返回查询结果
* @param err 错误时返回错误详细信息
* @return true 成功查询,false 未查询到
*/
bool query(const scylla_obj & obj, bool bhot,
std::vector<record_scylla> & vec, ed_error * err);

/**
* 批量插入数据
* @param vec 要插入的数据总量
* @param ttl 过期时间 <=0 则永不过期,单位秒
* @param err 出错时返回错误详细信息
* @return 成功插入记录数
*/
size_t insert(const std::vector<record_scylla> & vec,
long long ttl, ed_error * err);

/**
* 删除数据
* @param vec 要删除的记录
* @param err 出错时返回错误详细信息
* @return 成功删除的记录数
*/
size_t remove(const std::vector<record_scylla> & vec, ed_error * err);
public:
/**
* 设置认证的用户名和密码
* @param user 用户名
* @param pass 密码
*/
void set_auth(const char * user, const char * pass);

/**
* 设置连接超时
* @param ms_timeout 超时时间,单位ms
*/
void set_conn_timeout(int ms_timeout);

/**
* 设置读写超时
* @param timeout 超时时间,单位ms
*/
void set_rw_timeout(int ms_timeout);

/**
* 设置scyllad 处理io的线程数
* @param cn 线程数
*/
void set_io_thread_num(int cn);
private:
/**
* 创建session连接
* @return 非NULL 成功连接,返回连接对象,NULL 连接失败
*/
CassSession* create_connect_session(ed_error *err);

/**
* 获取查询的字符串信息
* @param obj 要查询的对象信息
* @param bhot 是否是查询热点数据
* @param str 返回查询字符串
* @param cloNum 返回查询的字段个数
*/
void get_query_string(const scylla_obj & obj, bool bhot, acl::string & str);

/**
* 从row中解析一条记录到vec中
* @param row 要解析的行
* @param bhot 是否是查询热点数据
* @param vec 存储解析结果
* @return true 成功解析,false 解析失败
*/
bool parse_one_record(const CassRow* row, bool bhot,
std::vector<record_scylla> & vec);

/**
* 获取其中某一列的值
* @param row 行信息
* @param cname 列名
* @param value 返回获取到的value
* @return true 成功获取,false 获取失败
*/
bool get_col_value(const CassRow* row, const char * cname,
std::string & value);
bool get_col_value(const CassRow* row, const char * cname, int & value);

/**
* 获取删除条件字符串
* @param key 添加的条件字符串
* @param key_title 添加时使用的关键字
* @param all 返回添加后的结果
*/
void get_remove_cond_str(const std::string & key,
const char * key_title, std::string & all);
void get_remove_cond_int(int key, const char * key_title, std::string & all);

/**
* 从连接池池中获取一个连接
*/
CassSession * get_session(ed_error * err);

/**
* 归还使用后的session
* @param session 要归还的session
* @param bok false session 异常,true 正常
*/
void put_session(CassSession * session, bool bok);
private:
int port_; //端口号
int conn_timeout_; //连接超时
int rw_timeout_; //读写超时
int thread_num_; //处理io线程数
std::string addr_; //连接地址
std::string user_; //用户名
std::string pass_; //密码
CassSession * session_;
# QLock lock_; // pool_ 实际应用时需要考虑加 锁
};

} /* DB */

db_scylla.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#include "stdafx.h"

#include "db_scylla.h"

namespace DB {

static const char * CONST_COL_URI = "uri";
static const char * CONST_COL_VIDC = "vidc";
static const char * CONST_COL_CACHE = "cache";
static const char * CONST_COL_TYPE = "typ";

static const char * CONST_COL_ZONE = "zone";
static const char * CONST_COL_LEVEL = "level";
static const char * KEYSPACE_TABLE="keyspace.test_table";

db_scylla::db_scylla()
: port_(9042)
, conn_timeout_(2000)
, rw_timeout_(2000)
, thread_num_(10)
, cluster_(NULL)
, session_(NULL)
{
}

db_scylla::~db_scylla() {
if(cluster_) {
cass_cluster_free(cluster_);
}
if(session_) {
cass_session_free(session_);
session_ = NULL;
}
}

bool db_scylla::open(const char * addr, int port) {
if(NULL==addr || port>65535 || port<=0) {
printf("open scylladb failed addr=%s, port=%d\n",
NULL==addr?"NULL":addr, port);
return false;
}

cluster_ = cass_cluster_new();
cass_cluster_set_port(cluster_, port);
cass_cluster_set_contact_points(cluster_, addr);
cass_cluster_set_credentials(cluster_, user_.c_str(), pass_.c_str());
cass_cluster_set_num_threads_io(cluster_, thread_num_); //处理请求的线程数
//cass_cluster_set_max_connections_per_host(cluster_, 50); //被废弃可不用了
//cass_cluster_set_core_connections_per_host(cluster_, 1); //每个线程的连接, 不用设置太多,默认的就够
return true;
}

void db_scylla::get_remove_cond_str(const std::string & key,
const char * key_title, std::string & all) {
if(key.empty()) {
return ;
}
acl::string tmp;
tmp.format("%s='%s'",key_title, key.c_str());
if(false == all.empty()) {
all.append(" AND ");
}
all.append(tmp.c_str());
}

void db_scylla::get_remove_cond_int(int key,
const char * key_title, std::string & all) {
if(key<0) {
return ;
}
acl::string tmp;
tmp.format("%s=%d",key_title, key);
if(false == all.empty()) {
all.append(" AND ");
}
all.append(tmp.c_str());
}

size_t db_scylla::remove(const std::vector<record_scylla> & vec, ed_error * err) {
size_t iok = 0;
CassSession * session = get_session(err);
if(NULL==session || vec.empty()) {
return iok;
}

CassFuture* futures[ vec.size() ];
for(size_t i=0; i<vec.size(); ++i) {
std::string strtmp;
get_remove_cond_str(vec[i].uri_, CONST_COL_URI, strtmp);
get_remove_cond_str(vec[i].vidc_, CONST_COL_VIDC, strtmp);
get_remove_cond_str(vec[i].cache_, CONST_COL_CACHE, strtmp);
get_remove_cond_int(vec[i].type_, CONST_COL_TYPE, strtmp);

acl::string qstr;
qstr.format("DELETE FROM cdn_global_index.uri_index WHERE %s ;", strtmp.c_str());
CassStatement * statement = cass_statement_new(qstr.c_str(), 0);
futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}

for (size_t i=0; i<vec.size(); ++i) {
CassFuture* future = futures[i];
cass_future_wait(future);
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
ADD_ED_ERROR(err, ERR_SDB_DELETE, "%s failed. err=%s", vec[i].uri_.c_str(),
cass_error_desc(rc));
} else {
iok++;
}
cass_future_free(future);
}
put_session(session, 0==iok?false:true);
return iok;
}

size_t db_scylla::insert(const std::vector<record_scylla> & vec, long long ttl,
ed_error * err) {
size_t iok = 0;
CassSession * session = get_session(err);
if(NULL==session || vec.empty()) {
return iok;
}
acl::string qstr;
if(ttl>0) {
qstr.format("INSERT INTO %s (%s, %s, %s, %s) VALUES(?,?,?,?) USING TTL %lld;",
KEYSPACE_TABLE, CONST_COL_URI, CONST_COL_VIDC, CONST_COL_CACHE, CONST_COL_TYPE, ttl);
} else {
qstr.format("INSERT INTO %s (%s, %s, %s, %s) VALUES(?,?,?,?) ;",
KEYSPACE_TABLE, CONST_COL_URI, CONST_COL_VIDC, CONST_COL_CACHE, CONST_COL_TYPE);
}
CassFuture* futures[ vec.size() ];
for(size_t i=0; i<vec.size(); ++i) {
CassStatement * statement = cass_statement_new(qstr.c_str(), 4);
cass_statement_bind_string(statement, 0, vec[i].uri_.c_str());
cass_statement_bind_string(statement, 1, vec[i].vidc_.c_str());
cass_statement_bind_string(statement, 2, vec[i].cache_.c_str());
cass_statement_bind_int32(statement, 3, vec[i].type_);

futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}

for (size_t i=0; i<vec.size(); ++i) {
CassFuture* future = futures[i];
cass_future_wait(future);
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
ADD_ED_ERROR(err, ERR_SDB_INSERT, "%s failed. err=%s", vec[i].uri_.c_str(),
cass_error_desc(rc));
} else {
iok++;
}
cass_future_free(future);
}
put_session(session, 0==iok?false:true);
return iok;
}

bool db_scylla::query(const scylla_obj & obj, bool bhot,
std::vector<record_scylla> & vec, ed_error * err) {
CassSession * session = get_session(err);
if(NULL==session) {
return false;
}
CassError rc = CASS_OK;
acl::string qstr;
get_query_string(obj, bhot, qstr);
CassStatement * statement = cass_statement_new(qstr.c_str(), 0);
CassFuture * future = cass_session_execute(session, statement);
if(CASS_OK!=(rc=cass_future_error_code(future))) {
ADD_ED_ERROR(err, ERR_SDB_QUERY, "query cass_future_wait failed. err=%s",
cass_error_desc(rc));
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
parse_one_record(row, bhot, vec);
}
cass_result_free(result);
cass_iterator_free(iterator);
}

cass_future_free(future);
cass_statement_free(statement);
put_session(session, rc==CASS_OK);
return rc==CASS_OK;
}


CassSession * db_scylla::get_session(ed_error * err) {
lock_.lock();
if(NULL==session_) {
session_ = create_connect_session(err);
}
lock_.unlock();
return session_;
CassSession * session = NULL;
lock_.lock();
if(pool_.empty()) {
lock_.unlock();
session = create_connect_session(err);
} else {
session = pool_.front();
pool_.pop_front();
lock_.unlock();
}
return session;
}

void db_scylla::put_session(CassSession * session, bool bok) {
return;
if(bok) {
lock_.lock();
pool_.push_back(session);
lock_.unlock();
} else {
cass_session_free(session);
}
}

void db_scylla::set_auth(const char * user, const char * pass) {
if(NULL==pass || NULL==user) {
logger_error("user or pass is NULL.");
return;
}
user_ = user;
pass_ = pass;
}

void db_scylla::set_conn_timeout(int ms_timeout) {
conn_timeout_ = ms_timeout;
}

void db_scylla::set_rw_timeout(int ms_timeout) {
rw_timeout_ = ms_timeout;
}

void db_scylla::set_io_thread_num(int cn) {
if(cn>0 && cn<1024) {
thread_num_ = cn;
}
}

CassSession* db_scylla::create_connect_session(ed_error* err){
CassSession* session = cass_session_new();
if(NULL==session) {
ADD_ED_ERROR(err, ERR_SDB_CREATE_SESSION, "create session failed.");
return NULL;
}
CassFuture* future = cass_session_connect(session, cluster_);
cass_future_wait(future);
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
ADD_ED_ERROR(err, ERR_SDB_CREATE_SESSION, "create session failed. err=%s",
cass_error_desc(rc));
cass_session_free(session);
session = NULL;
}
cass_future_free(future);
return session;
}

void db_scylla::get_query_string(const scylla_obj & obj, bool bhot,
acl::string & str) {
std::string tmpstr;
size_t maxPos = obj.vidc_.size()-1;
for(size_t i=0; i<obj.vidc_.size(); ++i) {
tmpstr.append("'").append(obj.vidc_[i]).append("'");
if(i!=maxPos) {
tmpstr.append(",");
}
}
if(bhot) {
if(tmpstr.empty()) {
str.format("SELECT %s,%s FROM %s WHERE uri='%s'",
CONST_COL_ZONE, CONST_COL_LEVEL, KEYSPACE_TABLE, obj.uri_.c_str());
} else {
str.format("SELECT %s,%s FROM %s WHERE uri='%s' AND zone in(%s)",
CONST_COL_ZONE, CONST_COL_LEVEL, KEYSPACE_TABLE,
obj.uri_.c_str(), tmpstr.c_str());
}
} else {
if(tmpstr.empty()) {
str.format("SELECT %s,%s,%s,%s FROM %s WHERE uri='%s'",
CONST_COL_URI, CONST_COL_VIDC, CONST_COL_CACHE, CONST_COL_TYPE,
KEYSPACE_TABLE, obj.uri_.c_str());
} else {
str.format("SELECT %s,%s,%s,%s FROM %s WHERE uri='%s' AND vidc in(%s)",
CONST_COL_URI, CONST_COL_VIDC, CONST_COL_CACHE, CONST_COL_TYPE,
KEYSPACE_TABLE, obj.uri_.c_str(), tmpstr.c_str());
}
}

}

bool db_scylla::parse_one_record(const CassRow* row, bool bhot,
std::vector<record_scylla> & vec) {
bool bret = true;
record_scylla record;
if(bhot) {
get_col_value(row, CONST_COL_ZONE, record.vidc_); //将zone 存放到vidc_中
get_col_value(row, CONST_COL_LEVEL, record.type_); //将level 存放到type中
} else {
get_col_value(row, CONST_COL_URI, record.uri_);
get_col_value(row, CONST_COL_VIDC, record.vidc_);
get_col_value(row, CONST_COL_CACHE, record.cache_);
// get_col_value(row, "zone", record.zone_);
get_col_value(row, CONST_COL_TYPE, record.type_);
}
vec.push_back(record);
return bret;
}

bool db_scylla::get_col_value(const CassRow* row, const char * cname,
std::string & value) {
bool bret = false;
const char* row_key;
size_t row_key_length;
if(CASS_OK==cass_value_get_string(
cass_row_get_column_by_name(row, cname),
&row_key, &row_key_length)) {
value = row_key;
bret = true;
}
return bret;
}

bool db_scylla::get_col_value(const CassRow* row, const char * cname,
int & value) {
bool bret = false;
if(CASS_OK==cass_value_get_int32(cass_row_get_column_by_name(row, cname),
&value)) {
bret = true;
}
return bret;
}

} /* DB */


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!