summaryrefslogtreecommitdiff
path: root/ot_sync.c
blob: c204c9556524b333cff137c84400e20e6fb2d9ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/* This software was written by Dirk Engling <erdgeist@erdgeist.org>
   It is considered beerware. Prost. Skol. Cheers or whatever.
   
   $id$ */

/* System */
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#ifdef WANT_THREAD_NAME_NP
#include <pthread_np.h>
#endif

/* Libowfat */
#include "scan.h"
#include "byte.h"
#include "io.h"

/* Opentracker */
#include "trackerlogic.h"
#include "ot_mutex.h"
#include "ot_sync.h"
#include "ot_stats.h"
#include "ot_iovec.h"

#ifdef WANT_SYNC_BATCH

#define OT_SYNC_CHUNK_SIZE (512*1024)

/* Import Changeset from an external authority
   format: d4:syncd[..]ee
   [..]:   ( 20:01234567890abcdefghij16:XXXXYYYY )+
*/
int add_changeset_to_tracker( uint8_t *data, size_t len ) {
  ot_hash    *hash;
  uint8_t    *end = data + len;
  unsigned long      peer_count;

  /* We do know, that the string is \n terminated, so it cant
     overflow */
  if( byte_diff( data, 8, "d4:syncd" ) ) return -1;
  data += 8;

  while( 1 ) {
    if( byte_diff( data, 3, "20:" ) ) {
      if( byte_diff( data, 2, "ee" ) )
        return -1;
      return 0;
    }
    data += 3;
    hash = (ot_hash*)data;
    data += sizeof( ot_hash );

    /* Scan string length indicator */
    data += ( len = scan_ulong( (char*)data, &peer_count ) );

    /* If no long was scanned, it is not divisible by 8, it is not
       followed by a colon or claims to need to much memory, we fail */
    if( !len || !peer_count || ( peer_count & 7 ) || ( *data++ != ':' ) || ( data + peer_count > end ) )
      return -1;

    while( peer_count > 0 ) {
      add_peer_to_torrent( hash, (ot_peer*)data, 1 );
      data += 8; peer_count -= 8;
    }
  }
  return 0;
}

/* Proposed output format
   d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
*/
static void sync_make( int *iovec_entries, struct iovec **iovector ) {
  int    bucket;
  char  *r, *re;

  /* Setup return vector... */
  *iovec_entries = 0;
  *iovector = NULL;
  if( !( r = iovec_increase( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE ) ) )
    return;

  /* ... and pointer to end of current output buffer.
     This works as a low watermark */
  re = r + OT_SYNC_CHUNK_SIZE;

  memmove( r, "d4:syncd", 8 ); r += 8;

  /* For each bucket... */
  for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
    /* Get exclusive access to that bucket */
    ot_vector *torrents_list = mutex_bucket_lock( bucket );
    size_t tor_offset;

    /* For each torrent in this bucket.. */
    for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
      /* Address torrents members */
      ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
      ot_hash     *hash      =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
      const size_t byte_count = sizeof(ot_peer) * peer_list->changeset.size;

      /* If we reached our low watermark in buffer... */
      if( re - r <= (ssize_t)(/* strlen( "20:" ) == */ 3 + sizeof( ot_hash ) + /* strlen_max( "%zd" ) == */ 12 + byte_count ) ) {

        /* Allocate a fresh output buffer at the end of our buffers list
           release bucket and return, if that fails */
        if( !( r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SYNC_CHUNK_SIZE ) ) )
          return mutex_bucket_unlock( bucket );

        /* Adjust new end of output buffer */
        re = r + OT_SYNC_CHUNK_SIZE;
      }

      *r++ = '2'; *r++ = '0'; *r++ = ':';
      memmove( r, hash, sizeof( ot_hash ) ); r += sizeof( ot_hash );
      r += sprintf( r, "%zd:", byte_count );
      memmove( r, peer_list->changeset.data, byte_count ); r += byte_count;
    }

    /* All torrents done: release lock on currenct bucket */
    mutex_bucket_unlock( bucket );
  }

  /* Close bencoded sync dictionary */
  *r++='e'; *r++='e';

  /* Release unused memory in current output buffer */
  iovec_fixlast( iovec_entries, iovector, r );
}

/* This is the entry point into this worker thread
   It grabs tasks from mutex_tasklist and delivers results back
*/
static void * sync_worker( void * args) {
  int iovec_entries;
  struct iovec *iovector;

  args = args;

  while( 1 ) {
    ot_tasktype tasktype = TASK_SYNC_OUT;
    ot_taskid   taskid   = mutex_workqueue_poptask( &tasktype );
    sync_make( &iovec_entries, &iovector );
    stats_issue_event( EVENT_SYNC_OUT, FLAG_TCP, iovec_length( &iovec_entries, &iovector) );
    if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
      iovec_free( &iovec_entries, &iovector );
  }
  return NULL;
}

static pthread_t thread_id;
void sync_init( ) {
  pthread_create( &thread_id, NULL, sync_worker, NULL );
#ifdef WANT_THREAD_NAME_NP
  pthread_set_name_np( thread_id, "opentracker (batchsync)");
#endif  
}

void sync_deinit( ) {
  pthread_cancel( thread_id );
}

void sync_deliver( int64 socket ) {
  mutex_workqueue_pushtask( socket, TASK_SYNC_OUT );
}

#endif

const char *g_version_sync_c = "$Source$: $Revision$\n";