Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 0d8c716

Browse files
conradludgatepimeys
authored andcommitted
Connection changes (sfackler#21)
* refactor query_raw_txt to use a pre-prepared statement * expose ready_status on RowStream
1 parent 580ac24 commit 0d8c716

File tree

8 files changed

+104
-105
lines changed

8 files changed

+104
-105
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
- uses: actions/checkout@v3
5757
- uses: sfackler/actions/rustup@master
5858
with:
59-
version: 1.65.0
59+
version: 1.67.0
6060
- run: echo "::set-output name=version::$(rustc --version)"
6161
id: rust-version
6262
- run: rustup target add wasm32-unknown-unknown

tokio-postgres/src/client.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,19 @@ impl Client {
372372

373373
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
374374
/// to save a roundtrip
375-
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
375+
pub async fn query_raw_txt<'a, T, S, I>(
376+
&self,
377+
statement: &T,
378+
params: I,
379+
) -> Result<RowStream, Error>
376380
where
377-
S: AsRef<str> + Sync + Send,
381+
T: ?Sized + ToStatement,
382+
S: AsRef<str>,
378383
I: IntoIterator<Item = Option<S>>,
379-
I::IntoIter: ExactSizeIterator + Sync + Send,
384+
I::IntoIter: ExactSizeIterator,
380385
{
381-
query::query_txt(&self.inner, query, params).await
386+
let statement = statement.__convert().into_statement(self).await?;
387+
query::query_txt(&self.inner, statement, params).await
382388
}
383389

384390
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ pub trait GenericClient: private::Sealed {
5757
I::IntoIter: ExactSizeIterator;
5858

5959
/// Like `Client::query_raw_txt`.
60-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
60+
async fn query_raw_txt<'a, T, S, I>(
61+
&self,
62+
statement: &T,
63+
params: I,
64+
) -> Result<RowStream, Error>
6165
where
66+
T: ?Sized + ToStatement + Sync + Send,
6267
S: AsRef<str> + Sync + Send,
6368
I: IntoIterator<Item = Option<S>> + Sync + Send,
6469
I::IntoIter: ExactSizeIterator + Sync + Send;
@@ -143,13 +148,14 @@ impl GenericClient for Client {
143148
self.query_raw(statement, params).await
144149
}
145150

146-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
151+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
147152
where
153+
T: ?Sized + ToStatement + Sync + Send,
148154
S: AsRef<str> + Sync + Send,
149155
I: IntoIterator<Item = Option<S>> + Sync + Send,
150156
I::IntoIter: ExactSizeIterator + Sync + Send,
151157
{
152-
self.query_raw_txt(query, params).await
158+
self.query_raw_txt(statement, params).await
153159
}
154160

155161
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
@@ -238,13 +244,14 @@ impl GenericClient for Transaction<'_> {
238244
self.query_raw(statement, params).await
239245
}
240246

241-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
247+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
242248
where
249+
T: ?Sized + ToStatement + Sync + Send,
243250
S: AsRef<str> + Sync + Send,
244251
I: IntoIterator<Item = Option<S>> + Sync + Send,
245252
I::IntoIter: ExactSizeIterator + Sync + Send,
246253
{
247-
self.query_raw_txt(query, params).await
254+
self.query_raw_txt(statement, params).await
248255
}
249256

250257
async fn prepare(&self, query: &str) -> Result<Statement, Error> {

tokio-postgres/src/query.rs

Lines changed: 36 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::prepare::get_type;
54
use crate::types::{BorrowToSql, IsNull};
6-
use crate::{Column, Error, Portal, Row, Statement};
5+
use crate::{Error, Portal, Row, Statement};
76
use bytes::{BufMut, Bytes, BytesMut};
8-
use fallible_iterator::FallibleIterator;
97
use futures_util::{ready, Stream};
108
use log::{debug, log_enabled, Level};
119
use pin_project_lite::pin_project;
1210
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
1311
use postgres_protocol::message::frontend;
14-
use postgres_types::Type;
12+
use postgres_types::Format;
1513
use std::fmt;
1614
use std::marker::PhantomPinned;
1715
use std::pin::Pin;
@@ -57,30 +55,29 @@ where
5755
statement,
5856
responses,
5957
command_tag: None,
58+
status: None,
59+
output_format: Format::Binary,
6060
_p: PhantomPinned,
6161
})
6262
}
6363

6464
pub async fn query_txt<S, I>(
6565
client: &Arc<InnerClient>,
66-
query: S,
66+
statement: Statement,
6767
params: I,
6868
) -> Result<RowStream, Error>
6969
where
70-
S: AsRef<str> + Sync + Send,
70+
S: AsRef<str>,
7171
I: IntoIterator<Item = Option<S>>,
7272
I::IntoIter: ExactSizeIterator,
7373
{
7474
let params = params.into_iter();
75-
let params_len = params.len();
7675

7776
let buf = client.with_buf(|buf| {
78-
// Parse, anonymous portal
79-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
8077
// Bind, pass params as text, retrieve as binary
8178
match frontend::bind(
8279
"", // empty string selects the unnamed portal
83-
"", // empty string selects the unnamed prepared statement
80+
statement.name(), // named prepared statement
8481
std::iter::empty(), // all parameters use the default format (text)
8582
params,
8683
|param, buf| match param {
@@ -98,8 +95,6 @@ where
9895
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
9996
}?;
10097

101-
// Describe portal to typecast results
102-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
10398
// Execute
10499
frontend::execute("", 0, buf).map_err(Error::encode)?;
105100
// Sync
@@ -108,43 +103,16 @@ where
108103
Ok(buf.split().freeze())
109104
})?;
110105

111-
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
112-
113106
// now read the responses
114-
115-
match responses.next().await? {
116-
Message::ParseComplete => {}
117-
_ => return Err(Error::unexpected_message()),
118-
}
119-
match responses.next().await? {
120-
Message::BindComplete => {}
121-
_ => return Err(Error::unexpected_message()),
122-
}
123-
let row_description = match responses.next().await? {
124-
Message::RowDescription(body) => Some(body),
125-
Message::NoData => None,
126-
_ => return Err(Error::unexpected_message()),
127-
};
128-
129-
// construct statement object
130-
131-
let parameters = vec![Type::UNKNOWN; params_len];
132-
133-
let mut columns = vec![];
134-
if let Some(row_description) = row_description {
135-
let mut it = row_description.fields();
136-
while let Some(field) = it.next().map_err(Error::parse)? {
137-
// NB: for some types that function may send a query to the server. At least in
138-
// raw text mode we don't need that info and can skip this.
139-
let type_ = get_type(client, field.type_oid()).await?;
140-
let column = Column::new(field.name().to_string(), type_, field);
141-
columns.push(column);
142-
}
143-
}
144-
145-
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
146-
147-
Ok(RowStream::new(statement, responses))
107+
let responses = start(client, buf).await?;
108+
Ok(RowStream {
109+
statement,
110+
responses,
111+
command_tag: None,
112+
status: None,
113+
output_format: Format::Text,
114+
_p: PhantomPinned,
115+
})
148116
}
149117

150118
pub async fn query_portal(
@@ -164,6 +132,8 @@ pub async fn query_portal(
164132
statement: portal.statement().clone(),
165133
responses,
166134
command_tag: None,
135+
status: None,
136+
output_format: Format::Binary,
167137
_p: PhantomPinned,
168138
})
169139
}
@@ -298,23 +268,13 @@ pin_project! {
298268
statement: Statement,
299269
responses: Responses,
300270
command_tag: Option<String>,
271+
output_format: Format,
272+
status: Option<u8>,
301273
#[pin]
302274
_p: PhantomPinned,
303275
}
304276
}
305277

306-
impl RowStream {
307-
/// Creates a new `RowStream`.
308-
pub fn new(statement: Statement, responses: Responses) -> Self {
309-
RowStream {
310-
statement,
311-
responses,
312-
command_tag: None,
313-
_p: PhantomPinned,
314-
}
315-
}
316-
}
317-
318278
impl Stream for RowStream {
319279
type Item = Result<Row, Error>;
320280

@@ -323,15 +283,22 @@ impl Stream for RowStream {
323283
loop {
324284
match ready!(this.responses.poll_next(cx)?) {
325285
Message::DataRow(body) => {
326-
return Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
286+
return Poll::Ready(Some(Ok(Row::new(
287+
this.statement.clone(),
288+
body,
289+
*this.output_format,
290+
)?)))
327291
}
328292
Message::EmptyQueryResponse | Message::PortalSuspended => {}
329293
Message::CommandComplete(body) => {
330294
if let Ok(tag) = body.tag() {
331295
*this.command_tag = Some(tag.to_string());
332296
}
333297
}
334-
Message::ReadyForQuery(_) => return Poll::Ready(None),
298+
Message::ReadyForQuery(status) => {
299+
*this.status = Some(status.status());
300+
return Poll::Ready(None);
301+
}
335302
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
336303
}
337304
}
@@ -345,4 +312,11 @@ impl RowStream {
345312
pub fn command_tag(&self) -> Option<String> {
346313
self.command_tag.clone()
347314
}
315+
316+
/// Returns if the connection is ready for querying, with the status of the connection.
317+
///
318+
/// This might be available only after the stream has been exhausted.
319+
pub fn ready_status(&self) -> Option<u8> {
320+
self.status
321+
}
348322
}

tokio-postgres/src/row.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ where
9898
/// A row of data returned from the database by a query.
9999
pub struct Row {
100100
statement: Statement,
101+
output_format: Format,
101102
body: DataRowBody,
102103
ranges: Vec<Option<Range<usize>>>,
103104
}
@@ -111,12 +112,17 @@ impl fmt::Debug for Row {
111112
}
112113

113114
impl Row {
114-
pub(crate) fn new(statement: Statement, body: DataRowBody) -> Result<Row, Error> {
115+
pub(crate) fn new(
116+
statement: Statement,
117+
body: DataRowBody,
118+
output_format: Format,
119+
) -> Result<Row, Error> {
115120
let ranges = body.ranges().collect().map_err(Error::parse)?;
116121
Ok(Row {
117122
statement,
118123
body,
119124
ranges,
125+
output_format,
120126
})
121127
}
122128

@@ -193,7 +199,7 @@ impl Row {
193199
///
194200
/// Useful when using query_raw_txt() which sets text transfer mode
195201
pub fn as_text(&self, idx: usize) -> Result<Option<&str>, Error> {
196-
if self.statement.output_format() == Format::Text {
202+
if self.output_format == Format::Text {
197203
match self.col_buffer(idx) {
198204
Some(raw) => {
199205
FromSql::from_sql(&Type::TEXT, raw).map_err(|e| Error::from_sql(e, idx))

tokio-postgres/src/statement.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use postgres_protocol::{
66
message::{backend::Field, frontend},
77
Oid,
88
};
9-
use postgres_types::Format;
109
use std::{
1110
fmt,
1211
sync::{Arc, Weak},
@@ -17,7 +16,6 @@ struct StatementInner {
1716
name: String,
1817
params: Vec<Type>,
1918
columns: Vec<Column>,
20-
output_format: Format,
2119
}
2220

2321
impl Drop for StatementInner {
@@ -51,22 +49,6 @@ impl Statement {
5149
name,
5250
params,
5351
columns,
54-
output_format: Format::Binary,
55-
}))
56-
}
57-
58-
pub(crate) fn new_text(
59-
inner: &Arc<InnerClient>,
60-
name: String,
61-
params: Vec<Type>,
62-
columns: Vec<Column>,
63-
) -> Statement {
64-
Statement(Arc::new(StatementInner {
65-
client: Arc::downgrade(inner),
66-
name,
67-
params,
68-
columns,
69-
output_format: Format::Text,
7052
}))
7153
}
7254

@@ -83,11 +65,6 @@ impl Statement {
8365
pub fn columns(&self) -> &[Column] {
8466
&self.0.columns
8567
}
86-
87-
/// Returns output format for the statement.
88-
pub fn output_format(&self) -> Format {
89-
self.0.output_format
90-
}
9168
}
9269

9370
/// Information about a column of a query.

tokio-postgres/src/transaction.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,14 @@ impl<'a> Transaction<'a> {
150150
}
151151

152152
/// Like `Client::query_raw_txt`.
153-
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
153+
pub async fn query_raw_txt<T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
154154
where
155-
S: AsRef<str> + Sync + Send,
155+
T: ?Sized + ToStatement,
156+
S: AsRef<str>,
156157
I: IntoIterator<Item = Option<S>>,
157-
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
I::IntoIter: ExactSizeIterator,
158159
{
159-
self.client.query_raw_txt(query, params).await
160+
self.client.query_raw_txt(statement, params).await
160161
}
161162

162163
/// Like `Client::execute`.

0 commit comments

Comments
 (0)