SQLite examples
#!/usr/bin/env python3
"""
Read raw messages from a supportred sensor and store them on a sqlite DB.
After reading the sensor, decode all messages on DB and print them.
- PMSx003 senor on /dev/ttyUSB0 by default
- read 4 samples for each sensor, by default
- read one sample from each sensor every 20 seconds, by default
"""
import sqlite3
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Callable, ContextManager, Iterator
from typer import Argument, Option, Typer, progressbar
from pms.core import Sensor, SensorReader, Supported
from pms.core.reader import ObsData, RawData
app = Typer(add_completion=False)
@app.command()
def main(
model: Supported = Argument(Supported.default, help="sensor model"),
port: str = Argument("/dev/ttyUSB0", help="serial port"),
db_path: Path = Argument(Path("pypms.sqlite"), help="sensor messages DB"),
samples: int = Option(4, "--samples", "-n"),
interval: int = Option(20, "--interval", "-i"),
):
"""
Read raw messages from a supported sensor and store them on a sqlite DB.
After reading the sensor, decode all messages on DB and print them.
"""
# get DB context manager
message_db = pypms_db(db_path)
sensor = Sensor[model]
# read from sensor and write to DB
with message_db() as db, SensorReader(sensor, port, interval, samples) as reader:
# read one obs from each sensor at the time
with progressbar(reader(raw=True), length=samples, label=f"reading {sensor}") as progress:
for obs in progress:
write_message(db, sensor, obs)
# read and decode all `sensor` messages on the DB
with message_db() as db:
print(sensor)
for obs in read_obs(db, sensor):
print(obs)
def pypms_db(db_path: Path) -> Callable[[], ContextManager[sqlite3.Connection]]:
"""
create db and messages table, if do not exists already
and return a context manager for a DB connection
"""
@contextmanager
def connect() -> Iterator[sqlite3.Connection]:
db = sqlite3.connect(str(db_path))
try:
yield db
except sqlite3.Error as e:
exit(e)
finally:
db.close()
create_table = """
CREATE TABLE IF NOT EXISTS messages (
time DATETIME NOT NULL,
sensor TEXT NOT NULL,
message BLOB NOT NULL,
UNIQUE (time, sensor)
);
"""
with connect() as db, db, closing(db.cursor()) as cur:
cur.executescript(create_table)
return connect
def write_message(db: sqlite3.Connection, sensor: Sensor, message: RawData):
"""insert raw messages into the DB"""
insert = """
INSERT OR IGNORE INTO messages (time, sensor, message)
VALUES (?, ?, ?);
"""
with db, closing(db.cursor()) as cur:
cur.execute(insert, (message.time, sensor.name, message.data))
def read_obs(db: sqlite3.Connection, sensor: Sensor) -> Iterator[ObsData]:
"""read messages from DB and return decoded observations"""
select = """
SELECT
message, time
FROM
messages
WHERE
sensor IS ?
ORDER BY
time;
"""
decode = lambda row: sensor.decode(row[0], time=row[1])
with closing(db.cursor()) as cur:
cur.execute(select, (sensor.name,))
return (decode(row) for row in cur.fetchall())
if __name__ == "__main__":
try:
app()
except KeyboardInterrupt:
print("")
#!/usr/bin/env python3
"""
Read raw messages from 2 different sensors and store them on a sqlite DB.
After reading the sensor, decode all messages on DB and print them.
- PMSx003 senor on /dev/ttyUSB0
- MCU680 sensor on /dev/ttyUSB1
- read 4 samples for each sensor, by default
- read one sample from each sensor every 20 seconds, by default
NOTE:
When reading 2 or more sensors only the timing of the first sensor is guarantied.
In this example, the second sensor will be read right after the first sensor.
"""
import sqlite3
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Callable, ContextManager, Iterator
from typer import Argument, Option, Typer, progressbar
from pms.core import Sensor, SensorReader
from pms.core.reader import ObsData, RawData
app = Typer(add_completion=False)
@app.command()
def main(
db_path: Path = Argument(Path("pypms.sqlite"), help="sensor messages DB"),
samples: int = Option(4, "--samples", "-n"),
interval: int = Option(20, "--interval", "-i"),
):
"""
Read raw messages from 2 different sensors
(PMSx003 on /dev/ttyUSB0 and MCU680 on /dev/ttyUSB1)
and store them on a sqlite DB.
After reading the sensor, decode all messages on DB and print them.
"""
# get DB context manager
message_db = pypms_db(db_path)
reader = dict(
pms=SensorReader("PMSx003", "/dev/ttyUSB0", interval, samples),
bme=SensorReader("MCU680", "/dev/ttyUSB1", interval, samples),
)
# read from each sensor and write to DB
with message_db() as db, reader["pms"] as pms, reader["bme"] as bme:
# read one obs from each sensor at the time
with progressbar(
zip(pms(raw=True), bme(raw=True)), length=samples, label="reading sensors"
) as progress:
for pms_obs, env_obs in progress:
write_message(db, pms.sensor, pms_obs)
write_message(db, bme.sensor, env_obs)
# read and decode all messages on the DB
with message_db() as db:
# extract obs from one sensor at the time
for sensor in [r.sensor for r in reader.values()]:
print(sensor)
for obs in read_obs(db, sensor):
print(obs)
def pypms_db(db_path: Path) -> Callable[[], ContextManager[sqlite3.Connection]]:
"""
create db and messages table, if do not exists already
and return a context manager for a DB connection
"""
@contextmanager
def connect() -> Iterator[sqlite3.Connection]:
db = sqlite3.connect(str(db_path))
try:
yield db
except sqlite3.Error as e:
exit(e)
finally:
db.close()
create_table = """
CREATE TABLE IF NOT EXISTS messages (
time DATETIME NOT NULL,
sensor TEXT NOT NULL,
message BLOB NOT NULL,
UNIQUE (time, sensor)
);
"""
with connect() as db, db, closing(db.cursor()) as cur:
cur.executescript(create_table)
return connect
def write_message(db: sqlite3.Connection, sensor: Sensor, message: RawData):
"""insert raw messages into the DB"""
insert = """
INSERT OR IGNORE INTO messages (time, sensor, message)
VALUES (?, ?, ?);
"""
with db, closing(db.cursor()) as cur:
cur.execute(insert, (message.time, sensor.name, message.data))
def read_obs(db: sqlite3.Connection, sensor: Sensor) -> Iterator[ObsData]:
"""read messages from DB and return decoded observations"""
select = """
SELECT
message, time
FROM
messages
WHERE
sensor IS ?
ORDER BY
time;
"""
decode = lambda row: sensor.decode(row[0], time=row[1])
with closing(db.cursor()) as cur:
cur.execute(select, (sensor.name,))
return (decode(row) for row in cur.fetchall())
if __name__ == "__main__":
try:
app()
except KeyboardInterrupt:
print("")
#!/usr/bin/env python3
"""
Read measurements from 2 different sensors and store them
on a sqlite DB as a "tall table" with a "wide table" view for each sensor.
After reading the sensor, get all measurements from the DB amd print them by sensor.
- PMSx003 senor on /dev/ttyUSB0
- MCU680 sensor on /dev/ttyUSB1
- read 4 samples for each sensor, by default
- read one sample from each sensor every 20 seconds, by default
NOTE:
the read_obs function creates a subclass of sensor.Data in order to avoid the
callin to __post_init__, as this was already tone when the sensor message was decoded.
Please open an issue or submit a PR i you know of a cleaner way to achieve this.
"""
import sqlite3
from contextlib import closing, contextmanager
from dataclasses import asdict, dataclass, fields
from pathlib import Path
from typing import Callable, ContextManager, Iterator
from typer import Argument, Option, Typer, progressbar
from pms.core import Sensor, SensorReader
from pms.core.reader import ObsData
app = Typer(add_completion=False)
@app.command()
def main(
db_path: Path = Argument(Path("pypms.sqlite"), help="sensor measurements DB"),
samples: int = Option(4, "--samples", "-n"),
interval: int = Option(20, "--interval", "-i"),
):
"""
Read measurements from 2 different sensors
(PMSx003 on /dev/ttyUSB0 and MCU680 on /dev/ttyUSB1)
and store them on a sqlite DB as a "tall table" with a "wide table" view for each sensor.
After reading the sensors, get all measurements from the DB amd print them by sensor.
"""
# get DB context manager
measurements_db = pypms_db(db_path)
reader = dict(
pms=SensorReader("PMSx003", "/dev/ttyUSB0", interval, samples),
bme=SensorReader("MCU680", "/dev/ttyUSB1", interval, samples),
)
# read from each sensor and write to DB
with measurements_db() as db, reader["pms"] as pms, reader["bme"] as bme:
# read one obs from each sensor at the time
with progressbar(zip(pms(), bme()), length=samples, label="reading sensors") as progress:
for pms_obs, env_obs in progress:
write_measurements(db, pms.sensor, pms_obs)
write_measurements(db, bme.sensor, env_obs)
# read all measurements on the DB and reconstruct sensor.Data objects
with measurements_db() as db:
# extract obs from one sensor at the time
for sensor in [r.sensor for r in reader.values()]:
print(sensor)
for obs in read_obs(db, sensor):
print(obs)
def pypms_db(db_path: Path) -> Callable[[], ContextManager[sqlite3.Connection]]:
"""
create db and table and update sensor views, if do not exists already
and return a context managet for a DB connection
"""
@contextmanager
def connect() -> Iterator[sqlite3.Connection]:
db = sqlite3.connect(str(db_path))
try:
yield db
except sqlite3.Error as e:
exit(e)
finally:
db.close()
create_table = """
CREATE TABLE IF NOT EXISTS measurements (
time DATETIME NOT NULL,
sensor TEXT NOT NULL,
field TEXT NOT NULL,
value NUMERIC NOT NULL,
UNIQUE (time, sensor, field)
);
"""
with connect() as db, db, closing(db.cursor()) as cur:
cur.executescript(create_table)
# create a "wide table" view for every suppoorted sensor
for sensor in Sensor:
view_fields = ",\n".join(
f"MAX(CASE WHEN field='{field.name}' THEN value ELSE NULL END) {field.name}"
for field in fields(sensor.Data)
if field.name != "time"
)
sensor_view = f"""
CREATE VIEW IF NOT EXISTS {sensor.name} AS
SELECT
MAX(time) time,
{view_fields}
FROM
measurements
WHERE
sensor IS '{sensor.name}'
GROUP BY
time
ORDER BY
time;
"""
cur.executescript(sensor_view)
return connect
def write_measurements(db: sqlite3.Connection, sensor: Sensor, obs: ObsData):
"""insert raw messages into the DB"""
insert = """
INSERT OR IGNORE INTO measurements (time, sensor, field, value)
VALUES (?, ?, ?, ?);
"""
values = (
(obs.time, sensor.name, field, value)
for field, value in asdict(obs).items()
if field != "time"
)
with db, closing(db.cursor()) as cur:
cur.executemany(insert, values)
def read_obs(db: sqlite3.Connection, sensor: Sensor) -> Iterator[ObsData]:
"""read measurements from DB and return observations"""
@dataclass
class NewObs(sensor.Data): # type: ignore[name-defined]
def __post_init__(self):
"""avoid call to post-init, this was already done when the message was decoded"""
pass
with closing(db.cursor()) as cur:
cur.execute(f"SELECT * FROM {sensor.name};")
return (NewObs(*row) for row in cur.fetchall())
if __name__ == "__main__":
try:
app()
except KeyboardInterrupt:
print("")