Refactor pull queries promisification

This commit is contained in:
Rogerio Chaves 2020-04-12 12:13:01 +02:00
parent 938db34a11
commit 850147ea2f
No known key found for this signature in database
GPG Key ID: E6AF5440509B1D94
3 changed files with 229 additions and 269 deletions

View File

@ -7,11 +7,10 @@ const debugPosts = require("debug")("queries:posts"),
debugPeople = require("debug")("queries:people"),
debugProfile = require("debug")("queries:profile");
const paramap = require("pull-paramap");
const { promisePull, mapValues } = require("./utils");
const latestOwnerValue = (ssbServer, { key, dest }) =>
new Promise((resolve, reject) => {
let value = null;
pull(
const latestOwnerValue = (ssbServer, { key, dest }) => {
return promisePull(
ssbServer.query.read({
reverse: true,
query: [
@ -32,25 +31,14 @@ const latestOwnerValue = (ssbServer, { key, dest }) =>
!(msg.value.content[key] && msg.value.content[key].remove)
);
}),
pull.take(1),
pull.drain(
(msg) => {
value = msg.value.content[key];
},
(err) => {
if (err) return reject(err);
if (!value) {
ssbServer.about
.latestValue({ key, dest })
.then(resolve)
.catch(reject);
} else {
resolve(value);
pull.take(1)
).then(([entry]) => {
if (entry) {
return entry.value.content[key];
}
}
)
);
return ssbServer.about.latestValue({ key, dest });
});
};
const mapProfiles = (ssbServer) => (data, callback) =>
getProfile(ssbServer, data.value.author)
@ -60,10 +48,10 @@ const mapProfiles = (ssbServer) => (data, callback) =>
})
.catch((err) => callback(err, null));
const getPosts = (ssbServer, profile) =>
debugPosts("Fetching") ||
new Promise((resolve, reject) => {
pull(
const getPosts = async (ssbServer, profile) => {
debugPosts("Fetching");
const posts = await promisePull(
// @ts-ignore
cat([
ssbServer.query.read({
@ -102,21 +90,17 @@ const getPosts = (ssbServer, profile) =>
}),
]),
pull.filter((msg) => msg.value.content.type == "post"),
paramap(mapProfiles(ssbServer)),
pull.collect((err, msgs) => {
debugPosts("Done");
const entries = msgs.map((x) => x.value);
if (err) return reject(err);
return resolve(entries);
})
paramap(mapProfiles(ssbServer))
);
});
debugPosts("Done");
return mapValues(posts);
};
const getVanishingMessages = async (ssbServer, profile) => {
debugMessages("Fetching");
const messagesPromise = new Promise((resolve, reject) => {
pull(
const messagesPromise = promisePull(
// @ts-ignore
cat([
ssbServer.query.read({
@ -156,19 +140,12 @@ const getVanishingMessages = async (ssbServer, profile) => {
pull.filter(
(msg) =>
msg.value.content.type == "post" &&
(msg.value.content.root ||
msg.value.content.recps.includes(profile.id))
(msg.value.content.root || msg.value.content.recps.includes(profile.id))
),
paramap(mapProfiles(ssbServer)),
pull.collect((err, msgs) => {
if (err) return reject(err);
return resolve(msgs);
})
paramap(mapProfiles(ssbServer))
);
});
const deletedPromise = new Promise((resolve, reject) => {
pull(
const deletedPromise = promisePull(
ssbServer.query.read({
reverse: true,
query: [
@ -183,13 +160,8 @@ const getVanishingMessages = async (ssbServer, profile) => {
},
},
],
}),
pull.collect((err, msgs) => {
if (err) return reject(err);
return resolve(Object.values(msgs));
})
);
});
).then(Object.values);
const [messages, deleted] = await Promise.all([
messagesPromise,
@ -200,10 +172,10 @@ const getVanishingMessages = async (ssbServer, profile) => {
return messages.filter((m) => !deletedIds.includes(m.key));
};
const searchPeople = (ssbServer, search) =>
debugPeople("Fetching") ||
new Promise((resolve, reject) => {
pull(
const searchPeople = async (ssbServer, search) => {
debugPeople("Fetching");
const people = await promisePull(
ssbServer.query.read({
reverse: true,
query: [
@ -225,22 +197,17 @@ const searchPeople = (ssbServer, search) =>
msg.value.author == msg.value.content.about &&
msg.value.content.name.includes(search)
);
}),
pull.collect((err, msgs) => {
debugPeople("Done");
const entries = msgs.map((x) => x.value);
if (err) return reject(err);
return resolve(Object.values(entries));
})
);
});
debugPeople("Done");
return Object.values(mapValues(people));
};
const getFriends = async (ssbServer, profile) => {
debugFriends("Fetching");
let contacts = await new Promise((resolve, reject) => {
pull(
let contacts = await promisePull(
// @ts-ignore
cat([
ssbServer.query.read({
@ -275,15 +242,8 @@ const getFriends = async (ssbServer, profile) => {
],
limit: 100,
}),
]),
pull.collect((err, msgs) => {
const entries = msgs.map((x) => x.value);
if (err) return reject(err);
return resolve(entries);
})
);
});
])
).then(mapValues);
let network = {};
let requestRejections = [];
@ -340,8 +300,7 @@ const getFriends = async (ssbServer, profile) => {
const getFriendshipStatus = async (ssbServer, source, dest) => {
debugFriendshipStatus("Fetching");
let requestRejectionsPromise = new Promise((resolve, reject) => {
pull(
let requestRejectionsPromise = promisePull(
ssbServer.query.read({
reverse: true,
query: [
@ -358,15 +317,8 @@ const getFriendshipStatus = async (ssbServer, source, dest) => {
},
],
limit: 100,
}),
pull.collect((err, msgs) => {
const entries = msgs.map((x) => x.value);
if (err) return reject(err);
return resolve(entries);
})
);
});
).then(mapValues);
const [isFollowing, isFollowingBack, requestRejections] = await Promise.all([
ssbServer.friends.isFollowing({ source: source, dest: dest }),
@ -391,8 +343,7 @@ const getFriendshipStatus = async (ssbServer, source, dest) => {
return status;
};
const getAllEntries = (ssbServer, query) =>
new Promise((resolve, reject) => {
const getAllEntries = (ssbServer, query) => {
let queries = [];
if (query.author) {
queries.push({ $filter: { value: { author: query.author } } });
@ -402,18 +353,14 @@ const getAllEntries = (ssbServer, query) =>
}
const queryOpts = queries.length > 0 ? { query: queries } : {};
pull(
return promisePull(
ssbServer.query.read({
reverse: true,
limit: 500,
...queryOpts,
}),
pull.collect((err, msgs) => {
if (err) return reject(err);
return resolve(msgs);
})
);
});
};
let profileCache = {};
const getProfile = async (ssbServer, id) => {

View File

@ -80,3 +80,16 @@ module.exports.uploadPicture = async (ssbServer, picture) => {
)
);
};
module.exports.promisePull = (...streams) =>
new Promise((resolve, reject) => {
pull(
...streams,
pull.collect((err, msgs) => {
if (err) return reject(err);
return resolve(msgs);
})
);
});
module.exports.mapValues = (x) => x.map((y) => y.value);

View File

@ -17,7 +17,7 @@ messages.forEach((message) => {
const onClose = () => {
const parent = modal.parentElement;
parent.parentElement.removeChild(parent);
if (document.querySelectorAll(".vanishing-message").length == 0) {
if (document.querySelectorAll(".js-vanishing-message").length == 0) {
document.querySelector(".js-vanishing-messages").style.display = "none";
}
};