Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "ftl/p2p-rm.hpp"
#include "ftl/p2p-rm/blob.hpp"
#include "ftl/p2p-rm/protocol.hpp"
#include <ftl/p2p-rm/cluster.hpp>
#include <ftl/net.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/net/socket.hpp>
#include <map>
#include <string>
#include <iostream>
using ftl::rm::Cluster;
using ftl::net::Listener;
using std::map;
using std::shared_ptr;
using ftl::URI;
using ftl::rm::Blob;
using ftl::net::Socket;
Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) {
auto me = this;
root_ = uri.getHost();
if (l != nullptr) {
l->onConnection([&](shared_ptr<Socket> s) {
me->addPeer(s);
});
}
}
Cluster::~Cluster() {
reset();
}
void Cluster::reset() {
for (auto x : blobs_) {
delete x.second;
}
blobs_.clear();
}
void Cluster::addPeer(shared_ptr<Socket> p) {
}
struct SyncHeader {
uint32_t blobid;
uint32_t offset;
uint32_t size;
};
void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
// Sanity check
if (offset + size > blob.size_) throw -1;
// TODO Delay send to collate many write operations?
if (blob.sockets_.size() > 0) {
SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)};
for (auto s : blob.sockets_) {
// Send over network
s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)),
std::string(&blob.data_[offset],size));
}
}
}
Blob *Cluster::_lookup(const char *uri) {
URI u(uri);
if (!u.isValid()) return NULL;
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
//if (u.getPathSegment(0) != "memory") return NULL;
if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
auto b = blobs_[u.getBaseURI()];
std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl;
if (b == nullptr) {
// Must do a p2p search for this URI...
int rpcid = rpcid_++;
for (auto p : peers_) {
p->send(P2P_FINDOWNER, ftl::net::rpc_pack(rpcid,uri));
}
int limit = 10;
while (limit >= 0 && !rpc_results_[rpcid] == nullptr) {
ftl::net::wait();
limit--;
}
if (rpc_results[rpcid]) {
// Unpack the data
auto res = rpc_results[rpcid];
} else {
// No results;
}
}
return b;
}
Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
ftl::rm::flags_t flags, const std::string &tname) {
URI u(uri);
if (!u.isValid()) return NULL;
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
if (blobs_[u.getBaseURI()] != NULL) return NULL;
Blob *b = new Blob;
b->data_ = addr;
b->size_ = size;
b->uri_ = std::string(uri);
blobs_[u.getBaseURI()] = b;
// TODO : Perhaps broadcast this new allocation?
return b;
}