| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| const { Pool } = require('pg'); |
| require('dotenv').config(); |
|
|
| const pool = new Pool({ connectionString: process.env.DATABASE_URL }); |
|
|
| module.exports = { |
| query: (text, params) => pool.query(text, params), |
| pool |
| }; |
|
|
| |
| |
| |
|
|
| |
| const { RedisPubSub } = require('graphql-redis-subscriptions'); |
| const Redis = require('ioredis'); |
| require('dotenv').config(); |
|
|
| const options = { |
| retryStrategy: times => Math.min(times * 50, 2000), |
| }; |
|
|
| const pubsub = new RedisPubSub({ |
| publisher: new Redis(process.env.REDIS_URL, options), |
| subscriber: new Redis(process.env.REDIS_URL, options), |
| }); |
|
|
| module.exports = pubsub; |
|
|
| |
| |
| |
|
|
| |
| const { gql } = require('apollo-server'); |
|
|
| const typeDefs = gql` |
| scalar DateTime |
| scalar JSON |
| |
| enum Namespace { infrastructure satellite } |
| |
| type Location { lat: Float lon: Float } |
| |
| type Provenance { source: String! license: String retrieved_at: DateTime! } |
| |
| type InfrastructureFault { |
| id: ID! |
| namespace: Namespace! |
| type: String! |
| timestamp: DateTime! |
| location: JSON |
| severity: Int! |
| confirmed: Boolean! |
| images: [String] |
| provenance: Provenance |
| } |
| |
| type Payout { |
| id: ID! |
| faultId: ID! |
| amountMinorUnits: Int! |
| currency: String! |
| payeeId: String! |
| status: String! |
| createdAt: DateTime! |
| settledAt: DateTime |
| txRef: String |
| } |
| |
| input IngestFaultInput { |
| namespace: Namespace! |
| type: String! |
| timestamp: DateTime! |
| location: JSON |
| severity: Int! |
| images: [String] |
| provenance: JSON |
| } |
| |
| input CreatePayoutInput { |
| faultId: ID! |
| amountMinorUnits: Int! |
| currency: String! |
| payeeId: String! |
| } |
| |
| type Query { |
| listInfraFaults(limit: Int = 50, offset: Int = 0): [InfrastructureFault!] |
| payoutsForFault(faultId: ID!): [Payout!] |
| } |
| |
| type Mutation { |
| ingestFault(input: IngestFaultInput!): InfrastructureFault! |
| confirmFault(id: ID!, confirmed: Boolean!): InfrastructureFault! |
| createPayout(input: CreatePayoutInput!): Payout! |
| settlePayout(payoutId: ID!): Payout! |
| } |
| |
| type Subscription { |
| faultCreated: InfrastructureFault! |
| faultConfirmed: InfrastructureFault! |
| payoutUpdated: Payout! |
| } |
| `; |
|
|
| module.exports = typeDefs; |
|
|
| |
| |
| |
|
|
| |
| const { GraphQLScalarType, Kind } = require('graphql'); |
| const db = require('./db'); |
| const pubsub = require('./pubsub'); |
| const { v4: uuidv4 } = require('uuid'); |
|
|
| const FAULT_CREATED = 'FAULT_CREATED'; |
| const FAULT_CONFIRMED = 'FAULT_CONFIRMED'; |
| const PAYOUT_UPDATED = 'PAYOUT_UPDATED'; |
|
|
| const DateTime = new GraphQLScalarType({ |
| name: 'DateTime', |
| description: 'ISO date-time scalar', |
| parseValue: value => new Date(value), |
| serialize: value => value instanceof Date ? value.toISOString() : new Date(value).toISOString(), |
| parseLiteral(ast) { |
| if (ast.kind === Kind.STRING) return new Date(ast.value); |
| return null; |
| } |
| }); |
|
|
| const JSONScalar = new GraphQLScalarType({ |
| name: 'JSON', |
| description: 'Arbitrary JSON value', |
| parseValue: value => value, |
| serialize: value => value, |
| parseLiteral(ast) { |
| switch (ast.kind) { |
| case Kind.STRING: return ast.value; |
| case Kind.INT: return parseInt(ast.value, 10); |
| case Kind.FLOAT: return parseFloat(ast.value); |
| case Kind.BOOLEAN: return ast.value === 'true'; |
| default: return null; |
| } |
| } |
| }); |
|
|
| const resolvers = { |
| DateTime, |
| JSON: JSONScalar, |
|
|
| Query: { |
| listInfraFaults: async (_, { limit, offset }) => { |
| const r = await db.query('SELECT * FROM objects WHERE type=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3', ['pothole-detection', limit, offset]); |
| return r.rows.map(r => ({ |
| id: r.id, |
| namespace: r.namespace, |
| type: r.type, |
| timestamp: r.timestamp, |
| location: r.location, |
| severity: r.severity, |
| confirmed: r.confirmed, |
| images: r.images, |
| provenance: r.provenance |
| })); |
| }, |
| payoutsForFault: async (_, { faultId }) => { |
| const r = await db.query('SELECT * FROM payouts WHERE fault_id=$1 ORDER BY created_at DESC', [faultId]); |
| return r.rows.map(p => ({ |
| id: p.id, |
| faultId: p.fault_id, |
| amountMinorUnits: parseInt(p.amount_minor_units, 10), |
| currency: p.currency, |
| payeeId: p.payee_id, |
| status: p.status, |
| createdAt: p.created_at, |
| settledAt: p.settled_at, |
| txRef: p.tx_ref |
| })); |
| } |
| }, |
|
|
| Mutation: { |
| ingestFault: async (_, { input }) => { |
| const id = uuidv4(); |
| const q = `INSERT INTO objects (id, namespace, type, timestamp, location, severity, images, provenance) |
| VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING *`; |
| const vals = [id, input.namespace, input.type, input.timestamp || new Date().toISOString(), JSON.stringify(input.location || null), input.severity, JSON.stringify(input.images || []), input.provenance || {}]; |
| const r = await db.query(q, vals); |
| const obj = r.rows[0]; |
| const payload = { |
| id: obj.id, |
| namespace: obj.namespace, |
| type: obj.type, |
| timestamp: obj.timestamp, |
| location: obj.location, |
| severity: obj.severity, |
| confirmed: obj.confirmed, |
| images: obj.images, |
| provenance: obj.provenance |
| }; |
| await pubsub.publish(FAULT_CREATED, payload); |
| return payload; |
| }, |
|
|
| confirmFault: async (_, { id, confirmed }) => { |
| const q = 'UPDATE objects SET confirmed=$1 WHERE id=$2 RETURNING *'; |
| const r = await db.query(q, [confirmed, id]); |
| if (r.rowCount === 0) throw new Error('fault not found'); |
| const obj = r.rows[0]; |
| const payload = { |
| id: obj.id, |
| namespace: obj.namespace, |
| type: obj.type, |
| timestamp: obj.timestamp, |
| location: obj.location, |
| severity: obj.severity, |
| confirmed: obj.confirmed, |
| images: obj.images, |
| provenance: obj.provenance |
| }; |
| await pubsub.publish(FAULT_CONFIRMED, payload); |
| return payload; |
| }, |
|
|
| createPayout: async (_, { input }) => { |
| const id = uuidv4(); |
| const q = `INSERT INTO payouts (id, fault_id, amount_minor_units, currency, payee_id, status) |
| VALUES ($1,$2,$3,$4,$5,'created') RETURNING *`; |
| const vals = [id, input.faultId, input.amountMinorUnits, input.currency, input.payeeId]; |
| const r = await db.query(q, vals); |
| const p = r.rows[0]; |
| const payload = { |
| id: p.id, |
| faultId: p.fault_id, |
| amountMinorUnits: parseInt(p.amount_minor_units, 10), |
| currency: p.currency, |
| payeeId: p.payee_id, |
| status: p.status, |
| createdAt: p.created_at, |
| settledAt: p.settled_at, |
| txRef: p.tx_ref |
| }; |
| await pubsub.publish(PAYOUT_UPDATED, payload); |
| return payload; |
| }, |
|
|
| settlePayout: async (_, { payoutId }) => { |
| |
| const client = await db.pool.connect(); |
| try { |
| await client.query('BEGIN'); |
| const r = await client.query('SELECT * FROM payouts WHERE id=$1 FOR UPDATE', [payoutId]); |
| if (r.rowCount === 0) throw new Error('payout not found'); |
| const payout = r.rows[0]; |
| if (payout.status !== 'created' && payout.status !== 'processing') { |
| await client.query('ROLLBACK'); |
| return { |
| id: payout.id, |
| faultId: payout.fault_id, |
| amountMinorUnits: parseInt(payout.amount_minor_units, 10), |
| currency: payout.currency, |
| payeeId: payout.payee_id, |
| status: payout.status, |
| createdAt: payout.created_at, |
| settledAt: payout.settled_at, |
| txRef: payout.tx_ref |
| }; |
| } |
| await client.query('UPDATE payouts SET status=$1 WHERE id=$2', ['processing', payoutId]); |
| |
| const fakeTx = `TX-${Date.now()}-${Math.floor(Math.random()*1000)}`; |
| await client.query('UPDATE payouts SET status=$1, tx_ref=$2, settled_at=now() WHERE id=$3', ['settled', fakeTx, payoutId]); |
| await client.query('COMMIT'); |
|
|
| const updated = (await db.query('SELECT * FROM payouts WHERE id=$1', [payoutId])).rows[0]; |
| const payload = { |
| id: updated.id, |
| faultId: updated.fault_id, |
| amountMinorUnits: parseInt(updated.amount_minor_units, 10), |
| currency: updated.currency, |
| payeeId: updated.payee_id, |
| status: updated.status, |
| createdAt: updated.created_at, |
| settledAt: updated.settled_at, |
| txRef: updated.tx_ref |
| }; |
| await pubsub.publish(PAYOUT_UPDATED, payload); |
| return payload; |
| } catch (err) { |
| await client.query('ROLLBACK'); |
| throw err; |
| } finally { |
| client.release(); |
| } |
| } |
| }, |
|
|
| Subscription: { |
| faultCreated: { subscribe: () => pubsub.asyncIterator([FAULT_CREATED]) }, |
| faultConfirmed: { subscribe: () => pubsub.asyncIterator([FAULT_CONFIRMED]) }, |
| payoutUpdated: { subscribe: () => pubsub.asyncIterator([PAYOUT_UPDATED]) } |
| } |
| }; |
|
|
| module.exports = resolvers; |
|
|
| |
| |
| |
|
|
| |
| const { ApolloServer } = require('apollo-server'); |
| const typeDefs = require('./schema'); |
| const resolvers = require('./resolvers'); |
| require('dotenv').config(); |
|
|
| async function start() { |
| const server = new ApolloServer({ typeDefs, resolvers }); |
| const { url } = await server.listen({ port: process.env.PORT || 4000 }); |
| console.log(`🚀 Server ready at ${url}`); |
| } |
|
|
| start(); |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|